Re: Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-15 文章 magichuang
感谢~   通过多次调试   是打的venv 包有问题, 已经解决了  现在可以在集群上跑了谢谢~


> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2021-04-15 10:32:49
> 收 件 人:user-zh ,magichu...@88.com
> 抄 送:
> 主 题:Re: Re: pyflink 运行提示:Function class 'class 
> org.apache.flink.table.functions.python.PythonScalarFunction' is not 
> serializable
>
> 你要不先用local的方式跑一下试试?先缩小一下范围。从报错看,编译的时候报错的,应该在你现在提交作业的机器上就可以复现出来。
>
> On Thu, Apr 15, 2021 at 10:24 AM magichuang wrote:
>
> > 您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java
> > version "16" 2021-03-16,这个有影响吗? 我是在"1.8.0_202" 上提交的
> >
> > 提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1
> > -ytm 1024m -p 1 -py traffic.py
> >
> >
> >
> >
> > > ---------- 原始邮件 ------
> > > 发 件 人:"Dian Fu"
> > > 发送时间:2021-04-14 23:11:57
> > > 收 件 人:user-zh
> > > 抄 送:
> > > 主 题:Re: pyflink 运行提示:Function class 'class
> > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > serializable
> > >
> > > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。
> > >
> > > [1]
> > >
> > https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
> > >
> > > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote:
> > >
> > > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on
> > > > yarn,per-job模式
> > > >
> > > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf
> > 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
> > > >
> > > >
> > > >
> > > >
> > > > 主要代码
> > > >
> > > >
> > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> > > > '128m')
> > > >
> > > > t_env.get_config().get_configuration().set_string("pipeline.jars",
> > > >
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> > > >
> > > >
> > t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> > > >
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> > > >
> > > >
> > > >
> > > >
> > > > t_env.add_python_archive("venv.zip")
> > > >
> > > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
> > > >
> > > >
> > > >
> > > >
> > > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> > > > result_type=DataTypes.INT())
> > > >
> > > > def judge_ip(src_ip, dst_ip):
> > > >
> > > > import IPy
> > > >
> > > > .
> > > >
> > > > t_env.register_function('judge_ip', judge_ip)
> > > >
> > > >
> > > >
> > > > 下面是主要报错信息
> > > >
> > > > Traceback (most recent call last):
> > > >
> > > > File "traffic-tuple-sf.py", line 59, in
> > > >
> > > > t_env.register_function('judge_ip', judge_ip)
> > > >
> > > > File
> > > >
> > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> > > > line 876, in register_function
> > > >
> > > > File
> > > >
> > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> > > > line 1286, in __call__
> > > >
> > > > File
> > > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> > line
> > > > 147, in deco
> > > >
> > > > File
> > > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> > line
> > > > 328, in get_return_value
> > > >
> > > > py4j.protocol.Py4JJavaError: An error occurred while calling
> > > > o5.registerFunction.
> > > >
> > > > : org.apache.flink.table.api.ValidationException: Function class 'class
> > > > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > > > serializable. Make sure that the class is self-contained (i.e. no
> > > > references to outer classes) and all inner fields are serializable as
> > well.
> > > >
> >

Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 文章 Dian Fu
你要不先用local的方式跑一下试试?先缩小一下范围。从报错看,编译的时候报错的,应该在你现在提交作业的机器上就可以复现出来。

On Thu, Apr 15, 2021 at 10:24 AM magichuang  wrote:

> 您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java
> version "16" 2021-03-16,这个有影响吗?  我是在"1.8.0_202" 上提交的
>
> 提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1
> -ytm 1024m -p 1 -py traffic.py
>
>
>
>
> > -- 原始邮件 --
> > 发 件 人:"Dian Fu" 
> > 发送时间:2021-04-14 23:11:57
> > 收 件 人:user-zh 
> > 抄 送:
> > 主 题:Re: pyflink 运行提示:Function class 'class
> org.apache.flink.table.functions.python.PythonScalarFunction' is not
> serializable
> >
> > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。
> >
> > [1]
> >
> https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
> >
> > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote:
> >
> > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on
> > > yarn,per-job模式
> > >
> > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf
> 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
> > >
> > >
> > >
> > >
> > > 主要代码
> > >
> > >
> t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> > > '128m')
> > >
> > > t_env.get_config().get_configuration().set_string("pipeline.jars",
> > >
> "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> > >
> > >
> t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> > >
> "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> > >
> > >
> > >
> > >
> > > t_env.add_python_archive("venv.zip")
> > >
> > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
> > >
> > >
> > >
> > >
> > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> > > result_type=DataTypes.INT())
> > >
> > > def judge_ip(src_ip, dst_ip):
> > >
> > > import IPy
> > >
> > > .
> > >
> > > t_env.register_function('judge_ip', judge_ip)
> > >
> > >
> > >
> > > 下面是主要报错信息
> > >
> > > Traceback (most recent call last):
> > >
> > > File "traffic-tuple-sf.py", line 59, in
> > >
> > > t_env.register_function('judge_ip', judge_ip)
> > >
> > > File
> > >
> "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> > > line 876, in register_function
> > >
> > > File
> > >
> "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> > > line 1286, in __call__
> > >
> > > File
> > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line
> > > 147, in deco
> > >
> > > File
> > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line
> > > 328, in get_return_value
> > >
> > > py4j.protocol.Py4JJavaError: An error occurred while calling
> > > o5.registerFunction.
> > >
> > > : org.apache.flink.table.api.ValidationException: Function class 'class
> > > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > > serializable. Make sure that the class is self-contained (i.e. no
> > > references to outer classes) and all inner fields are serializable as
> well.
> > >
> > > at
> > >
> org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)
> > >
> > > at
> > >
> org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)
> > >
> > > at
> > >
> org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)
> > >
> > > at
> > >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)
> > >
> > > at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > > Method)
> > >
> > > at
> > >
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
> > >
> > > 

Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 文章 magichuang
您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java version 
"16" 2021-03-16,这个有影响吗?  我是在"1.8.0_202" 上提交的

提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 -ytm 
1024m -p 1 -py traffic.py




> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2021-04-14 23:11:57
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: pyflink 运行提示:Function class 'class 
> org.apache.flink.table.functions.python.PythonScalarFunction' is not 
> serializable
>
> 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。
>
> [1]
> https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
>
> On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote:
>
> > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on
> > yarn,per-job模式
> >
> > 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
> >
> >
> >
> >
> > 主要代码
> >
> > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> > '128m')
> >
> > t_env.get_config().get_configuration().set_string("pipeline.jars",
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> >
> > t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> >
> >
> >
> >
> > t_env.add_python_archive("venv.zip")
> >
> > t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
> >
> >
> >
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> > result_type=DataTypes.INT())
> >
> > def judge_ip(src_ip, dst_ip):
> >
> > import IPy
> >
> > .
> >
> > t_env.register_function('judge_ip', judge_ip)
> >
> >
> >
> > 下面是主要报错信息
> >
> > Traceback (most recent call last):
> >
> > File "traffic-tuple-sf.py", line 59, in
> >
> > t_env.register_function('judge_ip', judge_ip)
> >
> > File
> > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> > line 876, in register_function
> >
> > File
> > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> > line 1286, in __call__
> >
> > File
> > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
> > 147, in deco
> >
> > File
> > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line
> > 328, in get_return_value
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o5.registerFunction.
> >
> > : org.apache.flink.table.api.ValidationException: Function class 'class
> > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > serializable. Make sure that the class is self-contained (i.e. no
> > references to outer classes) and all inner fields are serializable as well.
> >
> > at
> > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)
> >
> > at
> > org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)
> >
> > at
> > org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)
> >
> > at
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)
> >
> > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> >
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
> >
> > at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >
> > at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> >
> > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> >
> > at
>

Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 文章 Dian Fu
你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。

[1]
https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m

On Wed, Apr 14, 2021 at 4:58 PM magichuang  wrote:

> flink版本:1.11.2   Python版本 3.6 apache-flink==1.11.2,  用的是flink on
> yarn,per-job模式
>
> 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
>
>
>
>
> 主要代码
>
> t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> '128m')
>
> t_env.get_config().get_configuration().set_string("pipeline.jars",
> "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
>
> t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
>
>
>
>
> t_env.add_python_archive("venv.zip")
>
> t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
>
>
>
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> result_type=DataTypes.INT())
>
> def judge_ip(src_ip, dst_ip):
>
> import IPy
>
> .
>
> t_env.register_function('judge_ip', judge_ip)
>
>
>
> 下面是主要报错信息
>
> Traceback (most recent call last):
>
> File "traffic-tuple-sf.py", line 59, in 
>
> t_env.register_function('judge_ip', judge_ip)
>
> File
> "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 876, in register_function
>
> File
> "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>
> File
> "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
> 147, in deco
>
> File
> "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line
> 328, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o5.registerFunction.
>
> : org.apache.flink.table.api.ValidationException: Function class 'class
> org.apache.flink.table.functions.python.PythonScalarFunction' is not
> serializable. Make sure that the class is self-contained (i.e. no
> references to outer classes) and all inner fields are serializable as well.
>
> at
> org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)
>
> at
> org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)
>
> at
> org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)
>
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
>
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
> at java.base/java.lang.Thread.run(Thread.java:831)
>
> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make
> field private final byte[] java.lang.String.value accessible: module
> java.base does not "opens java.lang" to unnamed module @1311d9fb
>
> at
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
>
> at
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
>
> at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)
>
> at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>
> at
> org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346)
>
> ... 14 more
>
>
>
>
> 麻烦各位大佬给看看是哪里有问题呀,应该如何修改~   感谢
>
>
>
>
>
>
>
>