Coder Social home page Coder Social logo

pyflink_learn's Introduction

PyFlink 从入门到精通

基于 PyFlink 的学习文档,通过一个个小实践,便于小伙伴们快速入手 PyFlink

1、本地开发环境搭建

1.1、安装Flink

1.1.1、Mac

首先本地的 java 版本需要升级到 8 或 11

java -version
# 可能会看到 java version "1.8.0_111"

然后使用 brew 安装 Flink ,目前 Flink 的最新版本为 1.11.2

brew switch apache-flink 1.11.2

cd 到 /usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh 路径下,启动 flink

cd /usr/local/Cellar/apache-flink/1.11.2/libexec/bin
sh start-cluster.sh

启动后,运行 jps 命令,可以看到本地所有的 java 进程,如果 Flink 被正确安装的话,应该可以看到这两个进程 TaskManagerRunnerStandaloneSessionClusterEntrypoint ,代表现在 jobmanager 和 taskmanager 都已经正常启动了。

此时,我们也可以打开网页 http://localhost:8081/ ,看到 Flink 作业的管理面板,目前应该显示 Available Task Slots 为 1 (代表现在只有 1 个 taskmanager,且其中只有 1 个 task slot,并行度为 1),还可以看到 Running Jobs 为 0(代表此时没有 Flink 作业在执行)。

另外 flink 的关闭命令为

sh stop-cluster.sh

为了方便,可以修改本地的 ~/.bash_profile 文件,插入下面的 3 行内容(注意修改版本)然后运行 source ~/.bash_profile 来激活修改。

alias start-flink='/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh'
alias stop-flink='/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/stop-cluster.sh'
alias flink='/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/flink'

1.1.2、其他系统

请参考 官方文档

1.2、安装其他组件

本教程会用到 MySQL、Kafka、Zookeeper 等数据库或大数据组件,为了便于统一部署和管理,这里选择使用 docker。

从开发角度来看,以最快的速度搭建起一个可以运行的环境最为重要。基于如下的 3 个角度,解释了为何使用 Docker:

  1. Docker 可以很好地实现开发环境和生产环境的一致性。
  2. 使用 Docker 可以模拟多节点集群,使用docker-compose 工具,我们可以轻松的在单台开发机上启动多个 Kafka 容器、zookeeper 容器,非常方便的实现了对分布式环境的模拟。
  3. Docker 的安装、启动非常迅速。

首先,安装 docker

然后,在本教程的项目根目录下,启动 docker 编排服务:

# windows 系统先加下面这句
# set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d

启动后,运行 docker ps 可以看到起了 5 个容器,如下所示

CONTAINER ID        IMAGE                           COMMAND                  CREATED             STATUS              PORTS                                                  NAMES
32d6b6cdf30b        mysql:8.0.22                    "docker-entrypoint.s…"   5 days ago          Up 3 seconds        0.0.0.0:3306->3306/tcp, 33060/tcp                      mysql1
cc8246824903        mysql:8.0.22                    "docker-entrypoint.s…"   5 days ago          Up 3 seconds        33060/tcp, 0.0.0.0:3307->3306/tcp                      mysql2
f732effb7559        redis:6.0.9                     "docker-entrypoint.s…"   5 days ago          Up 5 seconds        0.0.0.0:6379->6379/tcp                                 redis
b62b8d8363c3        wurstmeister/kafka:2.13-2.6.0   "start-kafka.sh"         5 days ago          Up 3 seconds        0.0.0.0:9092->9092/tcp                                 kafka
fe2ad0230ffa        adminer                         "entrypoint.sh docke…"   5 days ago          Up 12 seconds       0.0.0.0:8080->8080/tcp                                 adminer
df80ca04755d        zookeeper:3.6.2                 "/docker-entrypoint.…"   5 days ago          Up 3 seconds        2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper

解释下各容器的作用:

  • mysql + admin:案例 3 会用到。共有 2 个 mysql 容器,其中 mysql1 容器作为待同步的数据源,mysql2 容器作为备份的数仓,admin 容器允许我们使用网页来查看和操作 mysql 容器(只是以防万一本地没有安装 mysql 客户端)。
  • kafka + zookeeper:案例 4 会用到。kafka 是高吞吐低延迟的消息中间件,常在业务系统中使用,不理解的话就可以简单地当成数据仓库,是实时流计算必备的组件,本教程里会指定不同的主题(topic)来分别实时存储原始数据和结果数据。zookeeper 常常和 kafka 结合一起使用,用于管理 kafka 的 broker,以及实现负载均衡,简单理解就是让 kafka 更加高效。
  • redis:案例 5 会用到。Redis 是基于内存的高性能的非关系型 Key-Value 数据库,同时也支持存储多种数据类型,读写效率都非常高,因而非常便于在实时计算中缓存我们训练好的模型。

PS,为了访问安全,在 docker-compose.yml 文件中可以看到我为一些组件设置了密码:

  1. MySQL 的账号密码都是 root。
  2. Redis 的密码是 redis_password。

很简单地,我们完成了环境的搭建。

另外,停止命令如下:

# 停止
docker-compose stop

# 停止并删除
docker-compose down

如果遇到某个容器启动失败的话,一个简单的方法就是先删掉该容器,然后重新构建,以 kafka 为例:

docker rm kafka
docker-compose up -d --build

1.3、安装Python3

PyFlink 要求 python 版本为 3.5、3.6 或 3.7,否则会出错。

推荐使用 miniconda 来搭建 python 环境,优点是体积小、与系统环境隔离、便于管理多个 python 虚拟环境……

网上很容易找到 python3 安装教程

2、运行

先确保以下环节是否走通:

  1. python 环境是否 ok 。
  2. docker 是否已经启动,容器是否正在运行。
  3. Flink 是否正确安装。

一切 ready 后,就完成本地 PyFilnk 开发与测试环境的搭建,让我们开始正题。

教程正文: PyFlink 从入门到精通,代码在 examples 目录下可以看到。

本教程目前提供了 5 个案例,如果是新手的话,建议按顺序来学习:

  • 1、批处理 Word Count
    • 教你如何使用 PyFlink 来进行批处理
    • 如何使用 Table API 和 SQL API 来实现 groupby 处理逻辑
    • 如何读取文件系统(如本地)上的文件并在处理后存储到另个文件系统(本案例还是本地)
  • 2、自定义函数 UDF
    • 教你如何在 PyFlink 中导入 python 的三方依赖包
    • 如何结合 UDF( 用户自定义的函数 )来实现复杂的计算逻辑
  • 3、实时 CDC
    • 教你如何使用 PyFlink 搭建实时数仓
    • 如何从业务数仓(本案例是 mysql1 )实时捕获 binlog 中的数据变更,并 upsert 到备份数仓(本案例是 mysql2 )
  • 4、实时排行榜
    • 教你如何使用 PyFlink 来实现有状态流处理
    • 如何在 python 环境中导入和使用 java 编写的聚合函数 jar 包
    • 如何使用滑动窗口,来实现一个指定时间范围内的排行榜。
  • 5、在线机器学习 Online Machine Learning
    • 教你如何使用 PyFlink 来进行在线机器学习
    • 如何在 UDF 中连接 Redis,以加载模型和保存模型
    • 如何在 UDF 中训练模型
    • 如何在 UDF 中注册指标和计算指标
    • 如何在 web 页面上实时查看指标,了解算法的运行情况
    • 如何开发 Flask 应用,并基于 Redis 里的最新模型提供预测服务。

运行的方法也很简单,对于每个案例,cd 到案例目录下后,运行下面的脚本(xx 换成对应的脚本名称)即可运行。

flink run -m localhost:8081 -py xxx.py

接下来,请前往 PyFlink 从入门到精通 吧。

pyflink_learn's People

Contributors

uncleguanghui avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

pyflink_learn's Issues

请问目前pyflink支持调用jar包里自定义的processWindowFunction吗

特别感谢您的教程,对我帮助非常大!我看了第四章的教程发现是调用了jar包里的aggregateFunction来实现的TopN, 但我的功能需要对一个滚动窗口使用全量聚合processWindowFunction, 输出也会是多行或者0行。
想请教一下这种情况应该怎么写呢, 还是在tableAPI里用select吗, 我查了官方文档实在是没找到这种示例 😳

第五个例子如何体现反馈?

你好,我的理解中,在线机器学习并不仅仅是将模型搬运到Redis里。
而是模型会随着我的反馈有所变化。
我觉得你给的例子已经将这个框架基本搞定了,唯独少了如何去做反馈。
在前端页面中,只有输入输出清除三个交互点。
我认为得加一个框框让用户自己输入正确的答案,然后这个结果反馈到flink中,重新调整参数的值。

stream12.py的male_top10,female_top10输出结果为空.

+I({},{},2021-06-22T11:52:58,2021-06-22T11:53:58)
+I({},{},2021-06-22T11:52:59,2021-06-22T11:53:59)
+I({},{},2021-06-22T11:53,2021-06-22T11:54)
+I({},{},2021-06-22T11:53:01,2021-06-22T11:54:01)
+I({},{},2021-06-22T11:53:02,2021-06-22T11:54:02)
+I({},{},2021-06-22T11:53:03,2021-06-22T11:54:03)
+I({},{},2021-06-22T11:53:04,2021-06-22T11:54:04)
+I({},{},2021-06-22T11:53:05,2021-06-22T11:54:05)
+I({},{},2021-06-22T11:53:06,2021-06-22T11:54:06)
+I({},{},2021-06-22T11:53:07,2021-06-22T11:54:07)
+I({},{},2021-06-22T11:53:08,2021-06-22T11:54:08)

t_env problem

大佬你好。我运行steam时候报这个错。我是python3.8。请问你有遇见过这个错误吗
File "/Users/rui/Documents/01181756gtu3/pyflink_learn-master/examples/5_online_machine_learning/stream.py", line 175, in
t_env.register_function('train_and_predict', model)
File "/Users/rui/GaiWork/vv/lib/python3.8/site-packages/pyflink/table/table_environment.py", line 1051, in register_function
self._j_tenv.registerFunction(name, java_function)
File "/Users/rui/GaiWork/vv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in call
return_value = get_return_value(
File "/Users/rui/GaiWork/vv/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
File "/Users/rui/GaiWork/vv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o13.registerFunction.

学习有感

最近仔细看了第五个案例的代码,发现model_server.py这个文件里有可以优化的地方。
起因是我觉得案例中使用的模型太过简单了,至少应该换成CNN实现一遍,原本以为不难,但是过程中还是遇到了不少问题,想要分享交流一下。

1.model_server.py中会从前端的form中获取图片的base64代码,但是代码中使用了svg这种比较少见的格式,并且现将其转化为svg图片存在本地,然后读到内存里转化为png,突出一个不优雅...

我研究了一下form中可获取的格式,其实可以直接获取png的图片的,只要在web.html中修改成
var image=$("#canvas").jSignature("getData", "image");
原来是
var image=$("#canvas").jSignature("getData", "svgbase64");

这样直接可以得到png这种格式的base64编码。这个例子中,知道了一个冷知识,原来不同图片格式的base64编码并不相通。

2.model_server.py中获取的图片是RGBA格式的,这点在我自己写代码的时候狠狠教育了我,按照往常的经验,cv2读取的图片完全是一篇黑色。原来是这从网页上获取的图片渲染的色彩完全靠第三个维度,即透明度,去实现。当默认读取0,1,2这三个维度的时候就无法呈现图像了。

起初我无法理解作者为什么要新建一个空白画布再复制上去,现在明白了。

经过一番折腾,也给出一种更加优雅的方式:

https://stackoverflow.com/questions/55677216/how-to-convert-an-rgba-image-to-grayscale-in-python

获得一个冷知识:RGBA的图片要单独拿出来试验一下,很多库函数没有支撑好。RGB图像处理的经验往往失效。

可以看出作者的实战经验是及其丰富的,即便不算优雅,但也是一种解决方案,让我大开眼界。

综上,base64转ndarray格式的代码如下:

import base64
import io

io_img = io.BytesIO(base64.b64decode(imgstr))         # 转化为二进制并读入内存
img = 255 - np.array(Image.open(io_img))[:, :, 3]     # Image库读取io对象,取出alpha维度,再黑白翻转
Image.fromarray(img)    # 查看图片,jupyter环境下

或者使用cv2

byte_img = (base64.b64decode(imgstr))                  # 得到图片二进制格式
flatten_img = np.frombuffer(byte_img, np.uint8)        # np将二进制转化为一维数组
img = cv2.imdecode(flatten_img, cv2.IMREAD_UNCHANGED)  # 以保留Alpha的方式读取图片
img = 255 - img[:, :, 3]

我印象中svglib这个库安装的时候不是很讨喜,需要一些不常见的依赖

还有一个细节是在实践中认识到的,腐蚀操作要和黑白翻转操作相对应,先后顺序是有讲究的。

===============================================================

同时也产生了一些其他问题,比如说算法模型里常常需要使用高维度的张量,比如说[[[],[],...],...]这样的三维张量,这时候flink sql似乎支持得不是很好。解决方案是展平,然后交由UDF函数处理。

在这个过程中,我传入kafka的数据类似这样:[-0.4242129623889923, -0.4242129623889923, 0.6449587345123291, ... ]长度大约几千,在从kafka导入flink的时候出错了,我写的数据类型是ARRAY,但一直无法被识别。

Caused by: java.lang.NoSuchMethodError: 'void sun.misc.Unsafe.monitorEnter(java.lang.Object)'解决办法

我在docker中遇到了这个问题,查了不少资料,说是java环境的问题。

因此在linux环境中,更改JAVA的版本为8.

RUN apt-get update && \
    apt-get install -y openjdk-8-jdk && \

JAVA 版本为:

root@a3fe62309e9e:~# java -version
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~20.04-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)

以上办法可以解决。

problem

您好,请问一下。我在学习案例4_window的时候,本机执行stream12.py可以成功运行,而提交到flink环境时,连接kafka源那块报错 “”“” syntaxError: invalid syntax。flink 版本为1.12, python版本为3.65。期待您的回答,谢谢!

the question is as follow

Traceback (most recent call last):
File "", line 6, in
File "/data/python3/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 858, in sql_query
j_table = self._j_tenv.sqlQuery(query)
File "/data/python3/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in call
answer, self.gateway_client, self.target_id, self.name)
File "/data/python3/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
File "/data/python3/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o82.sqlQuery.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.source'.

请问json格式的文件读取(Source)怎么添加 jar 包

①使用 t_env.connect(FileSystem())获取 json 文件的数据如何添加 json 的 jar 包呢,我看官方文档都是使用 maven 添加字段来使用依赖,PyFlink 的话该怎么处理呢?

② Source 读取数据以后,该怎么标准化输出
我用了 print 和 print_schema() 都是输出表的属性之类的,不是标准化输出里面的数据

感谢回复!!!

关于python run 和flink run 的问题

您好,

能否再请教一下,为啥网上其他pyflink示例可以通过python 解释器直接运行(环境相同),但是您的example 代码都得是
flink run -py 这样的呢?

对比了一下官方example里的wordcount, 最后一行 有 t_env.execute(‘xxx’), 是因为这个原因吗?
抑或是table env设置时候是

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=env_settings)

官方example 是

    t_config = TableConfig()
    env = ExecutionEnvironment.get_execution_environment()
    t_env = BatchTableEnvironment.create(env, t_config)

的原因?

感谢指导~

关于example 5中的问题想请教一下大佬

您好,我最近在通过您的示例学习pyflink, 收益匪浅,但是示例5中stream.py 的udf 函数不是很理解,因为我只看到了定义但是没看到调用,我在run的时候没有数据写入到我自己的redis 中,导致报错,想请教一下

  1. model class 中函数是怎么调用的?我只在最后的sink 看到中调用了udf(train_and_predict), 但是eval ,open 这些方法是怎么调用的呢?
  2. pyflink 因为是调用py4j run 的flink, 怎么debug比较好 ?(i.e 打断点这种)

感谢!

启动报错 TypeError: 'JavaPackage' object is not callable

您好, 请问在运行代码的时候
启动环境会报错

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()

具体在 运行 EnvironmentSettings.new_instance() 就已经报错了

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/octopi/PycharmProjects/pyflink/venv/lib/python3.8/site-packages/pyflink/table/environment_settings.py", line 187, in new_instance
    return EnvironmentSettings.Builder()
  File "/Users/octopi/PycharmProjects/pyflink/venv/lib/python3.8/site-packages/pyflink/table/environment_settings.py", line 46, in __init__
    gateway = get_gateway()
  File "/Users/octopi/PycharmProjects/pyflink/venv/lib/python3.8/site-packages/pyflink/java_gateway.py", line 67, in get_gateway
    _gateway.jvm.org.apache.flink.client.python.PythonEnvUtils.resetCallbackClient(
TypeError: 'JavaPackage' object is not callable

错误位置在 java_gateway.py 这个文件中

感觉是环境问题 但是没找到相关资料
耽误大神几分钟时间解答一下, 感激

哪里执行了eval 方法

案例5_online_machine_learning中,代码在哪一行执行了模型训练的方法eval ?

一直没看明白,期待你的回复

版本1.12.0 运行的问题

在运行第一个example wordcount的时候,flink 1.12.0 报说

t_env.execute_sql(f"""
    CREATE TABLE source (
        id BIGINT,     -- ID
        word STRING    -- word
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'file://{dir_word}',
        'format' = 'csv'
    )
""")  // syntax error 

报错

  File "batch1.py", line 24
    """)
      ^
SyntaxError: invalid syntax

请问这是什么问题

2_udf:报错

运行flink run -m localhost:8081 -py batch.py时,报错,错误信息如下:

`2021-01-04 20:03:03
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory!
at org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197)
at org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164)
at org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
at org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.file)), 'bin'))
output: Traceback (most recent call last):
File "", line 1, in
ModuleNotFoundError: No module named 'pyflink'

at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:198)
at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
at org.apache.flink.python.env.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:179)
at org.apache.flink.python.AbstractPythonFunctionRunner.createPythonExecutionEnvironment(AbstractPythonFunctionRunner.java:249)
at org.apache.flink.table.runtime.runners.python.AbstractPythonStatelessFunctionRunner.createExecutableStage(AbstractPythonStatelessFunctionRunner.java:158)
at org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:195)
... 16 more`

pyflink 关于任务执行的问题

pyflink任务的py脚本启动执行方式很少有文档提及,自己摸索感觉到不太对,想请大佬们帮忙看看,在此谢过

环境:
flink==1.15
python==3.7
apache-flink==1.15

问题1:使用flink run执行py脚本时无法指定python环境
官网执行py任务命令:./bin/flink run --python examples/python/table/word_count.py
但是python项目都是有自己的virtualenv,看官方文档没有相关指定python环境的参数,是pyflink不支持吗?

问题2:直接使用python来执行py脚本可以成功,并看到输出,但是这种方式可行吗?
直接使用虚拟环境的python来执行py脚本可以成功执行,但是在这个过程中没有任何地方去指定flink,在flink中也没有看到任务的信息,感觉是这个py任务脱离了flink

问题3:windows环境执行py脚本,在创建环境时就报错,但是没有看到相关讨论的文章
步骤:直接使用python来运行py脚本

Traceback (most recent call last):
  File "F:/PyDev/exercise/DB/pyflinker.py", line 14, in <module>
    stream_execution_environment=StreamExecutionEnvironment.get_execution_environment(),
  File "D:\.env\py37_data\lib\site-packages\pyflink\datastream\stream_execution_environment.py", line 802, in get_execution_environment
    gateway = get_gateway()
  File "D:\.env\py37_data\lib\site-packages\pyflink\java_gateway.py", line 62, in get_gateway
    _gateway = launch_gateway()
  File "D:\.env\py37_data\lib\site-packages\pyflink\java_gateway.py", line 106, in launch_gateway
    p = launch_gateway_server_process(env, args)
  File "D:\.env\py37_data\lib\site-packages\pyflink\pyflink_gateway_server.py", line 326, in launch_gateway_server_process
    stdin=PIPE, preexec_fn=preexec_fn, env=env)
  File "C:\Program Files\Python37\lib\subprocess.py", line 800, in __init__
    restore_signals, start_new_session)
  File "C:\Program Files\Python37\lib\subprocess.py", line 1207, in _execute_child
    startupinfo)
FileNotFoundError: [WinError 2] 系统找不到指定的文件。

org.apache.beam.vendor.grpc.v1p21p0.io.grpc.netty.NettyServerTransport notifyTerminated

i use pyflink and just run stream.py in IDE( not use flink command, just mouse right click and "run xx.py"). i get:

���� 02, 2023 10:12:31 ���� org.apache.beam.vendor.grpc.v1p21p0.io.grpc.netty.NettyServerTransport notifyTerminated
��Ϣ: Transport failed
java.io.IOException: ��������е������ֹ��һ���ѽ��������ӡ�
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1125)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)

���� 02, 2023 10:12:32 ���� org.apache.beam.vendor.grpc.v1p21p0.io.grpc.netty.NettyServerTransport notifyTerminated
��Ϣ: Transport failed
java.io.IOException: ��������е������ֹ��һ���ѽ��������ӡ�
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1125)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)

���� 02, 2023 10:12:36 ���� org.apache.beam.vendor.grpc.v1p21p0.io.grpc.netty.NettyServerTransport notifyTerminated
��Ϣ: Transport failed
java.io.IOException: Զ������ǿ�ȹر���һ�����е����ӡ�
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1125)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)

���� 02, 2023 10:12:36 ���� org.apache.beam.vendor.grpc.v1p21p0.io.grpc.netty.NettyServerTransport notifyTerminated
��Ϣ: Transport failed
java.io.IOException: Զ������ǿ�ȹر���һ�����е����ӡ�
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1125)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)

���� 02, 2023 10:12:42 ���� org.apache.beam.vendor.grpc.v1p21p0.io.grpc.netty.NettyServerTransport notifyTerminated
��Ϣ: Transport failed
java.io.IOException: Զ������ǿ�ȹر���һ�����е����ӡ�
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1125)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)

Traceback (most recent call last):
File "E:\python-project\fedStreaming\FedStreaming\pyflink_learn\examples\5_online_machine_learning\stream.py", line 228, in
t_env.execute('Classifier Model Train')
File "C:\Users\12892\AppData\Local\Programs\Python\Python36\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
File "C:\Users\12892\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\java_gateway.py", line 1286, in call
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\12892\AppData\Local\Programs\Python\Python36\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco
return f(*a, **kw)
File "C:\Users\12892\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1717)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory!
at org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197)
at org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164)
at org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65)
at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143)
at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131)
at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88)
at org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80)
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Failed to execute the command: E:\pythonProject\FedStreaming\venv\Scripts\python.exe -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.file)), 'bin'))
output:
at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:198)
at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
at org.apache.flink.python.env.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:179)
at org.apache.flink.python.AbstractPythonFunctionRunner.createPythonExecutionEnvironment(AbstractPythonFunctionRunner.java:249)
at org.apache.flink.table.runtime.runners.python.AbstractPythonStatelessFunctionRunner.createExecutableStage(AbstractPythonStatelessFunctionRunner.java:158)
at org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:195)
... 16 more

Process finished with exit code 1

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.