Re: Pyflink提交

2022-11-25 Thread Xingbo Huang
Hi,

根据报错的提示,执行命令./python3.6.8.zip/bin/python3时没法导入pyflink,你可以在本地检查一下你的这个虚拟环境是不是没有成功安上pyflink

Best,
Xingbo

程龙 <13162790...@163.com> 于2022年11月25日周五 16:02写道:

> 在使用pyflink提交任务时,部署模式onyarn
> 1 在不使用Map等算子下如下参数 能够提交成功 并且运行
> .flink run  -ynm pytest   -m yarn-cluster  -pyclientexec ***/python3
>  -pyexec ***/python3  -pyarch *** /python3.6.8.zip   -py  demo.py
>
>
> 2 在使用到map算子时 提交没有问题,但是运行报错,报错日志如下:
> .flink run  -ynm pytest   -m yarn-cluster  -pyclientexec ***/python3
>  -pyexec ***/python3  -pyarch *** /python3.6.8.zip   -py  demo.py
> 报错内容:
> Caused by: java.io.IOException: Failed to execute the command:
> ./python3.6.8.zip/bin/python3 -c import pyflink;import
> os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),
> 'bin'))
> output: Traceback (most recent call last):
>   File "", line 1, in 
> ModuleNotFoundError: No module named 'pyflink'
> at
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:211)
> at
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:154)
> at
> org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:156)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:398)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:246)
> at
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)
>
>
> 请问环境变量如何设置才能正常运行


Re: Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread Xingbo Huang
conda构建的venv会把机器底层一些c的库包括进来,会比较全。通过python venv包构建的虚拟环境可能会因为跨机器出现问题。

Best,
Xingbo

RS  于2022年11月23日周三 09:25写道:

> Hi,
> 我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
> python3 -m venv jxy_venv
>
>
> 我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的
>
>
>
> Thanks
>
>
>
>
>
> 在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
> >Hi RS,
> >
> >你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
> >
> >Best,
> >Xingbo
> >
> >[1]
> >
> https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
> >
> >RS  于2022年11月22日周二 15:14写道:
> >
> >> Hi,
> >> Flink版本:1.15.1
> >>
> >>
> >>
> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
> >> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
> >> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
> >>
> >>
> >> 启动命令:
> >> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081
> -n
> >> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
> >> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
> >>
> >>
> >> 报错信息:
> >> ...
> >> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
> >> undefined symbol: _Py_LegacyLocaleDetected
> >> org.apache.flink.client.program.ProgramAbortException:
> >> java.lang.RuntimeException: Python process exits with code: 127
> >> at
> >> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> >> at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
> >> at
> >>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
> >> Caused by: java.lang.RuntimeException: Python process exits with code:
> 127
> >> at
> >> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
> >> ... 16 more
> >>
> >>
> >> 请问下,这种情况需要怎么处理 ?
> >> flink的环境中,一定要安装python命令吗?
> >>
> >>
> >> Thanks
> >>
> >>
>


Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-21 Thread Xingbo Huang
Hi RS,

你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]

Best,
Xingbo

[1]
https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda

RS  于2022年11月22日周二 15:14写道:

> Hi,
> Flink版本:1.15.1
>
>
> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
>
>
> 启动命令:
> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n
> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
>
>
> 报错信息:
> ...
> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
> undefined symbol: _Py_LegacyLocaleDetected
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 127
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
> Caused by: java.lang.RuntimeException: Python process exits with code: 127
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
> ... 16 more
>
>
> 请问下,这种情况需要怎么处理 ?
> flink的环境中,一定要安装python命令吗?
>
>
> Thanks
>
>


Re: Facing Issue in running Python Flink Program in flink cluster (Version=1.15.2)

2022-11-21 Thread Xingbo Huang
Hi Harshit,

According to the stack you provided, I guess you define your Python
function in the main file, and the Python function imports xgboost
globally. The reason for the error is that the xgboost library is difficult
to be serialized by cloudpickle. There are two ways to solve

1. Move `import xgboost` to the inside of the Python function.

2. Move the Python function to another Python file, and then add that
Python file as a dependency via `add_python_file`[1].

Best,
Xingbo

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/dependency_management/#python-libraries

harshit.varsh...@iktara.ai  于2022年11月21日周一
20:51写道:

> Dear Team,
>
> I am facing a issue while running pyflink  program in flink cluster as it
> stop running while reading the machine learning model
>
>
>
> This is the error :
>
>
>
> ./bin/flink run --python /home/desktop/ProjectFiles/test_new.py
>
>
>
> Job has been submitted with JobID 0a561cb330eeac5aa7b40ac047d3c6a3
>
> /home/desktop/.local/lib/python3.8/site-packages/sklearn/base.py:329:
> UserWarning: Trying to unpickle estimator LabelEncoder from version 1.1.1
> when using version 1.1.2. This might lead to breaking code or invalid
> results. Use at your own risk. For more info please refer to:
>
>
> https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
>
>   warnings.warn(
>
> /home/desktop/.local/lib/python3.8/site-packages/xgboost/compat.py:31:
> FutureWarning: pandas.Int64Index is deprecated and will be removed from
> pandas in a future version. Use pandas.Index with the appropriate dtype
> instead.
>
>   from pandas import MultiIndex, Int64Index
>
> /home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:116:
> PkgResourcesDeprecationWarning: 0.1.36ubuntu1 is an invalid version and
> will not be supported in a future release
>
>   warnings.warn(
>
> /home/desktop/.local/lib/python3.8/site-packages/pkg_resources/__init__.py:116:
> PkgResourcesDeprecationWarning: 0.23ubuntu1 is an invalid version and will
> not be supported in a future release
>
>   warnings.warn(
>
> Traceback (most recent call last):
>
>   File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
>
> return _run_code(code, main_globals, None,
>
>   File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
>
> exec(code, run_globals)
>
>   File
> "/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9efe0fb192f1/test_new.py",
> line 510, in 
>
> new_user_ratings(envir)
>
>   File
> "/tmp/pyflink/bc5f6233-c53d-45a0-b8f2-21b41ea0c6ad/71b84799-5210-4a85-ae80-9efe0fb192f1/test_new.py",
> line 504, in new_user_ratings
>
> environment.execute('new_user_ratings')
>
>   File
> "/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
> line 761, in execute
>
>   File
> "/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py",
> line 1321, in __call__
>
>   File
> "/home/desktop/Downloads/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 146, in deco
>
>   File
> "/home/desktop/Downloads/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py",
> line 326, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute.
>
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0a561cb330eeac5aa7b40ac047d3c6a3)
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:173)
>
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:123)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at
> 

Re: Dependency resolution issue with apache-flink 1.16.0 python package.

2022-11-16 Thread Xingbo Huang
Hi Yogi,

I think the problem comes from poetry depending on the metadata in PyPI.
This problem has been reported in
https://issues.apache.org/jira/browse/FLINK-29817 and I will fix it in
1.16.1.

Best,
Xingbo

Yogi Devendra  于2022年11月17日周四 06:21写道:

> Dear community/maintainers,
>
> Thanks for the lovely platform of Apache Flink.
> I got following error when add apache-flink 1.16.0 dependency in my python
> project.
>
> Given below is the stack trace for further investigation.
> When I tried using lower version (1.15.2) for the same; I was able to move
> forward.
>
> Can you please confirm if this is a bug or I am doing something wrong?
>
> -
>
> learn-flink git:(master) ✗ poetry add apache-flink --verbose
> Using virtualenv: /Users/d.vyavahare/repo/
> github.com/yogidevendra/python-warm-up/learn-flink/.venv
> Using version ^1.16.0 for apache-flink
>
> Updating dependencies
> Resolving dependencies... (2.4s)
>
>   SolveFailure
>
>   Because pemja (0.2.6) depends on numpy (1.21.4)
>and apache-flink (1.16.0) depends on numpy (>=1.14.3,<1.20), pemja
> (0.2.6) is incompatible with apache-flink (1.16.0).
>   And because apache-flink (1.16.0) depends on pemja (0.2.6), apache-flink
> is forbidden.
>   So, because no versions of apache-flink match >1.16.0,<2.0.0
>and learn-flink depends on apache-flink (^1.16.0), version solving
> failed.
>
>   at ~/Library/Application
> Support/pypoetry/venv/lib/python3.8/site-packages/poetry/mixology/version_solver.py:364
> in _resolve_conflict
>   360│ )
>   361│ self._log(f'! which is caused by
> "{most_recent_satisfier.cause}"')
>   362│ self._log(f"! thus: {incompatibility}")
>   363│
> → 364│ raise SolveFailure(incompatibility)
>   365│
>   366│ def _choose_package_version(self) -> str | None:
>   367│ """
>   368│ Tries to select a version of a required package.
>
> The following error occurred when trying to handle this error:
>
>
>   SolverProblemError
>
>   Because pemja (0.2.6) depends on numpy (1.21.4)
>and apache-flink (1.16.0) depends on numpy (>=1.14.3,<1.20), pemja
> (0.2.6) is incompatible with apache-flink (1.16.0).
>   And because apache-flink (1.16.0) depends on pemja (0.2.6), apache-flink
> is forbidden.
>   So, because no versions of apache-flink match >1.16.0,<2.0.0
>and learn-flink depends on apache-flink (^1.16.0), version solving
> failed.
>
>   at ~/Library/Application
> Support/pypoetry/venv/lib/python3.8/site-packages/poetry/puzzle/solver.py:159
> in _solve
>   155│ packages = result.packages
>   156│ except OverrideNeeded as e:
>   157│ return
> self.solve_in_compatibility_mode(e.overrides, use_latest=use_latest)
>   158│ except SolveFailure as e:
> → 159│ raise SolverProblemError(e)
>   160│
>   161│ combined_nodes =
> depth_first_search(PackageNode(self._package, packages))
>   162│ results = dict(aggregate_package_nodes(nodes) for nodes
> in combined_nodes)
>   163│
>
> 
>
> ~ Yogi
>


[ANNOUNCE] Apache Flink 1.16.0 released

2022-10-28 Thread Xingbo Huang
The Apache Flink community is very happy to announce the release of Apache
Flink 1.16.0, which is the first release for the Apache Flink 1.16 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the
improvements for this release:
https://flink.apache.org/news/2022/10/28/1.16-announcement.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351275

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Chesnay, Martijn, Godfrey & Xingbo


[ANNOUNCE] Apache Flink 1.16.0 released

2022-10-28 Thread Xingbo Huang
The Apache Flink community is very happy to announce the release of Apache
Flink 1.16.0, which is the first release for the Apache Flink 1.16 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the
improvements for this release:
https://flink.apache.org/news/2022/10/28/1.16-announcement.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351275

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Chesnay, Martijn, Godfrey & Xingbo


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Xingbo Huang
+1 for reverting these changes in Flink 1.16, so I will cancel 1.16.0-rc1.
+1 for `numXXXSend` as the alias of `numXXXOut` in 1.15.3.

Best,
Xingbo

Chesnay Schepler  于2022年10月10日周一 19:13写道:

> > I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
>
> But that's not possible. If it were that simple there would have never
> been a need to introduce another metric in the first place.
>
> It's a rather fundamental issue with how the new sinks work, in that they
> emit data to the external system (usually considered as "numRecordsOut" of
> sinks) while _also_ sending data to a downstream operator (usually
> considered as "numRecordsOut" of tasks).
> The original issue was that the numRecordsOut of the sink counted both
> (which is completely wrong).
>
> A new metric was always required; otherwise you inevitably end up breaking
> *some* semantic.
> Adding a new metric for what the sink writes to the external system is,
> for better or worse, more consistent with how these metrics usually work in
> Flink.
>
> On 10/10/2022 12:45, Qingsheng Ren wrote:
>
> Thanks everyone for joining the discussion!
>
> > Do you have any idea what has happened in the process here?
>
> The discussion in this PR [1] shows some details and could be helpful to
> understand the original motivation of the renaming. We do have a test case
> for guarding metrics but unfortunaly the case was also modified so the
> defense was broken.
>
> I think the reason why both the developer and the reviewer forgot to
> trigger an discussion and gave a green pass on the change is that metrics
> are quite “trivial” to be noticed as public APIs. As mentioned by Martijn I
> couldn’t find a place noting that metrics are public APIs and should be
> treated carefully while contributing and reviewing.
>
> IMHO three actions could be made to prevent this kind of changes in the
> future:
>
> a. Add test case for metrics (which we already have in SinkMetricsITCase)
> b. We emphasize that any public-interface breaking changes should be
> proposed by a FLIP or discussed in mailing list, and should be listed in
> the release note.
> c. We remind contributors and reviewers about what should be considered as
> public API, and include metric names in it.
>
> For b and c these two pages [2][3] might be proper places.
>
> About the patch to revert this, it looks like we have a consensus on 1.16.
> As of 1.15 I think it’s worthy to trigger a minor version. I didn’t see
> complaints about this for now so it should be OK to save the situation
> asap. I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
> considering there could possibly some users have already adapted their
> system to the new naming, and have another internal metric for reflecting
> number of outgoing committable batches (actually the numRecordsIn of sink
> committer operator should be carrying this info already).
>
> [1] https://github.com/apache/flink/pull/18825
> [2] https://flink.apache.org/contributing/contribute-code.html
> [3] https://flink.apache.org/contributing/reviewing-prs.html
>
> Best,
> Qingsheng
> On Oct 10, 2022, 17:40 +0800, Xintong Song 
> , wrote:
>
> +1 for reverting these changes in Flink 1.16.
>
> For 1.15.3, can we make these metrics available via both names (numXXXOut
> and numXXXSend)? In this way we don't break it for those who already
> migrated to 1.15 and numXXXSend. That means we still need to change
> SinkWriterOperator to use another metric name in 1.15.3, which IIUC is
> internal to Flink sink.
>
> I'm overall +1 to change numXXXOut back to its original semantics. AFAIK
> (from meetup / flink-forward questionaires), most users do not migrate to a
> new Flink release immediately, until the next 1-2 major releases are out.
>
> Best,
>
> Xintong
>
>
> On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser 
> wrote:
>
>> Hi Qingsheng,
>>
>> Do you have any idea what has happened in the process here? Do we know why
>> they were changed? I was under the impression that these metric names were
>> newly introduced due to the new interfaces and because it still depends on
>> each connector implementing these.
>>
>> Sidenote: metric names are not mentioned in the FLIP process as a public
>> API. Might make sense to have a separate follow-up to add that to the list
>> (I do think we should list them there).
>>
>> +1 for reverting this and make this change in Flink 1.16
>>
>> I'm not in favour of releasing a Flink 1.15.3 with this change: I think
>> the
>> impact is too big for a patch version, especially given how long Flink
>> 1.15
>> is already out there.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Mon, Oct 10, 2022 at 11:13 AM Leonard Xu  wrote:
>>
>> > Thanks Qingsheng for starting this thread.
>> >
>> > +1 on reverting sink metric name and releasing 1.15.3 to fix this
>> > inconsistent behavior.
>> >
>> >
>> > Best,
>> > Leonard
>> >
>> >
>> >
>> >
>> >
>> > 2022年10月10日 下午3:06,Jark Wu  写道:
>> >
>> > Thanks for discovering this problem, Qingsheng!
>> >

[ANNOUNCE] Apache Flink 1.14.6 released

2022-09-27 Thread Xingbo Huang
The Apache Flink community is very happy to announce the release of Apache
Flink 1.14.6, which is the fifth bugfix release for the Apache Flink 1.14
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2022/09/28/release-1.14.6.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351834

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xingbo


Re: Unable to run pyflink job - NetUtils getAvailablePort Error

2022-09-06 Thread Xingbo Huang
Hi Raman,

This problem comes from the inconsistency between your flink version and
pyflink version

Best,
Xingbo

Ramana  于2022年9月6日周二 15:08写道:

> Hello there,
>
> I have a pyflink setup of 1 : JobManager - 1 : Task Manager.
>
> Trying to run a pyflink job and no matter what i do, i get the following
> error message.
>
> -
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: java.lang.NoSuchMethodError:
> org.apache.flink.util.NetUtils.getAvailablePort()Lorg/apacheflink/util/NetUtils$Port;
> 
> Caused by: java.lang.NoSuchMethodError:
> org.apache.flink.util.NetUtils.getAvailablePort()Lorg/apacheflink/util/NetUtils$Port;
> at
> org.apache.flink.client.python.PythonEnvUtils.lambda$startGatewayServer$3(PythonUtils.java:365)
> at java.lang.Thread.run(Thread.java:750)
> 
>
> Tried executing with some out of the box examples, yet I get the same
> error above.
>
> Could anybody shed some light on why the error is occurring, and how I can
> have it resolved?
>
> Appreciate any help here.
>
> Thanks.
> Ramana
> --
> DREAM IT, DO IT
>


Re: [ANNOUNCE] Apache Flink 1.15.2 released

2022-08-24 Thread Xingbo Huang
Thanks Danny for driving this release

Best,
Xingbo

Jing Ge  于2022年8月25日周四 05:50写道:

> Thanks Danny for your effort!
>
> Best regards,
> Jing
>
> On Wed, Aug 24, 2022 at 11:43 PM Danny Cranmer 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.15.2, which is the second bugfix release for the Apache
>> Flink 1.15 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2022/08/25/release-1.15.2.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351829
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Danny Cranmer
>>
>


Re: [ANNOUNCE] Apache Flink 1.15.2 released

2022-08-24 Thread Xingbo Huang
Thanks Danny for driving this release

Best,
Xingbo

Jing Ge  于2022年8月25日周四 05:50写道:

> Thanks Danny for your effort!
>
> Best regards,
> Jing
>
> On Wed, Aug 24, 2022 at 11:43 PM Danny Cranmer 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.15.2, which is the second bugfix release for the Apache
>> Flink 1.15 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2022/08/25/release-1.15.2.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351829
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Danny Cranmer
>>
>


Re: 关于 storm 转 flink 的一些咨询

2022-07-28 Thread Xingbo Huang
Hi,

(1) 1.15没有在Python
DataStream上提供各种内置的Window的支持,需要自定义。在1.16我们补全了这部分的支持,window支持将得到完善。关于asnyc
io的支持暂时没有这方面的计划,一方面的考虑是pyflink现有的process
mode[1]的执行模式就是asnyc的模式了。join的支持需要在thread mode下才能够支持,后续会支持。

(2) 对于主体是pyflink,部分是java的情况,你可以考虑Py4j的方式试试。

(3) 对于主体是java,部分想用python的情况,其实有好些用户对这方面有这部分的需求,可以考虑使用java function +
pemja[2]的方式

Best,
Xingbo

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/python_execution_mode/
[2] https://github.com/alibaba/pemja

yidan zhao  于2022年7月27日周三 17:20写道:

> (1)
> 现在pyflink支持如何呀,我看了1.15.0的代码中,examples中没有window 的demo,看源码来看貌似没有直接可用的
> windowAssigner,都需要自定义?
> 看 master 貌似才有,这部分目前的可用性如何,感觉是不是还不够完善。
>
> 其次,目前看async io相关的也不支持。join也不支持对吗。
> 对于当前使用pyflink情况,部分不支持的,但是java部分支持的api,如何使用呢?
>
> (2)
> 其次,我想知道,是否存在反过来的用法。比如主体程序是java,但是部分算子用python实现,注意我说的都是 dataStream
> api,非batch。
> 比如基于python实现一个flatMap,针对每个元素跑模型预测。主体则使用java,有什么实现方法吗?
>
> yidan zhao  于2022年7月13日周三 21:54写道:
> >
> > 谢谢回答。
> >
> > Xingbo Huang  于2022年7月13日周三 16:55写道:
> > >
> > > Hi,
> > >
> > > 简单来说,如果你的作业逻辑中只使用了纯java的算子,比如你写的是一个没有使用 Python udf 的sql/table api
> > > 作业时,那么运行时就不需要对Python有需求,但是如果你使用了python udf,或者是你用的是python datastream
> > > api来编写你的作业,那么运行时就对python环境有要求,毕竟那些python的自定义函数逻辑是需要Python解释器来执行的。
> > >
> > > pyflink的runtime有两种执行模式process和thread。process模式是基于apache beam
> > >
> portability框架做的进程间通信的方式,让python自定义函数运行在专门的Python进程的方式。关于thread模式则是基于pemja[1]做的嵌入的方式,让python直接嵌入到jvm里面运行,这种方式是1.15引入的,关于这部分内容可以参考文档[2]
> > >
> > >
> 关于性能问题的话,首先,如果你没有用python自定义函数,性能和java一模一样,因为你本质上只是用了pyflink的api。如果你用了python自定义函数,那就取决于你的性能瓶颈在哪了,因为我们知道python函数的性能是不如Java函数的。关于框架层的开销,我之前有写了专门的文章[3]分析过。
> > >
> > > 希望对你有所帮助。
> > >
> > > Best,
> > > Xingbo
> > >
> > > [1] https://github.com/alibaba/pemja
> > > [2]
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/python_execution_mode/
> > > [3] https://flink.apache.org/2022/05/06/pyflink-1.15-thread-mode.html
> > >
> > > yidan zhao  于2022年7月13日周三 15:47写道:
> > >
> > > > 目前看了下pyflink,想了解下,pyflink的任务实际运行时也是JAVA+python双环境吗。
> > > > 涉及java和python交互等是吗。性能相比java直接开发的任务会有区别吗?
> > > >
> > > > yidan zhao  于2022年7月12日周二 19:27写道:
> > > > >
> > > > > 公司有部分项目是基于 storm 开发的,目前想进行改造,计划用 flink。
> > > > >
> > > > > 初步看了下代码,发现 storm 中实现都是通过 multi-lang 方式各种调用 shell、python
> > > > > 实现。这些shell和python主要通过storm提供的一个 storm.py 基础包实现和 父进程
> > > > > 的通信(基于stdin和stdout貌似)。
> > > > >
> > > > > 想问问,这种如何改造呢?
> > > > >
> 首先是大方向上,(1)连同python、shell部分一起改造。(2)保留python、shell部分,基于flink实现一套类似机制。
> > > > >
> > > > > (1)和(2)目前看起来都会很复杂。
> > > > >
> > > > > 有没有小伙伴做过类似事情呢?
> > > >
>


Re: 邮箱无法退订

2022-07-13 Thread Xingbo Huang
Hi,退订请发送至邮箱user-zh-unsubscr...@flink.apache.org

Best,
Xingbo

Summer  于2022年7月14日周四 11:03写道:

>
> 你好,这是什么原因?
> 使用网页登录华为邮箱,无法退订。
>
>
> Reporting-MTA: dns; localhost
> Received-From-MTA: dns; localhost
>
> Final-Recipient: rfc822; user-zh-unsubscr...@flink.apache.org
> Action: failed
> Status: 504 5.5.2 : Helo command rejected: need fully-qualified hostname
> Diagnostic-Code: smtp; 504 5.5.2 : Helo command rejected: need
> fully-qualified hostname
> Last-Attempt-Date: Thu, 14 Jul 2022 10:59:42 X (GMT+08:00)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: 关于 storm 转 flink 的一些咨询

2022-07-13 Thread Xingbo Huang
Hi,

简单来说,如果你的作业逻辑中只使用了纯java的算子,比如你写的是一个没有使用 Python udf 的sql/table api
作业时,那么运行时就不需要对Python有需求,但是如果你使用了python udf,或者是你用的是python datastream
api来编写你的作业,那么运行时就对python环境有要求,毕竟那些python的自定义函数逻辑是需要Python解释器来执行的。

pyflink的runtime有两种执行模式process和thread。process模式是基于apache beam
portability框架做的进程间通信的方式,让python自定义函数运行在专门的Python进程的方式。关于thread模式则是基于pemja[1]做的嵌入的方式,让python直接嵌入到jvm里面运行,这种方式是1.15引入的,关于这部分内容可以参考文档[2]

关于性能问题的话,首先,如果你没有用python自定义函数,性能和java一模一样,因为你本质上只是用了pyflink的api。如果你用了python自定义函数,那就取决于你的性能瓶颈在哪了,因为我们知道python函数的性能是不如Java函数的。关于框架层的开销,我之前有写了专门的文章[3]分析过。

希望对你有所帮助。

Best,
Xingbo

[1] https://github.com/alibaba/pemja
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/python_execution_mode/
[3] https://flink.apache.org/2022/05/06/pyflink-1.15-thread-mode.html

yidan zhao  于2022年7月13日周三 15:47写道:

> 目前看了下pyflink,想了解下,pyflink的任务实际运行时也是JAVA+python双环境吗。
> 涉及java和python交互等是吗。性能相比java直接开发的任务会有区别吗?
>
> yidan zhao  于2022年7月12日周二 19:27写道:
> >
> > 公司有部分项目是基于 storm 开发的,目前想进行改造,计划用 flink。
> >
> > 初步看了下代码,发现 storm 中实现都是通过 multi-lang 方式各种调用 shell、python
> > 实现。这些shell和python主要通过storm提供的一个 storm.py 基础包实现和 父进程
> > 的通信(基于stdin和stdout貌似)。
> >
> > 想问问,这种如何改造呢?
> > 首先是大方向上,(1)连同python、shell部分一起改造。(2)保留python、shell部分,基于flink实现一套类似机制。
> >
> > (1)和(2)目前看起来都会很复杂。
> >
> > 有没有小伙伴做过类似事情呢?
>


Re: Guide for building Flink image with Python doesn't work

2022-07-07 Thread Xingbo Huang
Hi Gyula,

According to the log, we can see that you downloaded the source package of
pemja, not the wheel package of pemja[1]. I guess you are using the m1
machine. If you install pemja from the source package, you need to have
JDK, gcc tools and CPython with Numpy in the environment. I believe this
can be solved after you prepared those tools, but other dependencies of
pyflink 1.15 do not support m1, which makes PyFlink 1.15 unable to install
and use in m1.

We have supported m1 in release 1.16[2]. If a large number of m1 users have
big demand for PyFlink 1.15, I think we need to consider whether it is
necessary to backport this support to 1.15, which will break our
compatibility issues between minor versions.
Best,
Xingbo

[1] https://pypi.org/project/pemja/0.1.4/
[2] https://issues.apache.org/jira/browse/FLINK-25188

Gyula Fóra  于2022年7月6日周三 13:56写道:

> Here it is, copied from the docs essentially:
>
> FROM flink:1.15.0
>
>
> # install python3: it has updated Python to 3.9 in Debian 11 and so
> install Python 3.7 from source
> # it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
>
> RUN apt-get update -y && \
> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev git && \
> wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
> tar -xvf Python-3.7.9.tgz && \
> cd Python-3.7.9 && \
> ./configure --without-tests --enable-shared && \
> make -j6 && \
> make install && \
> ldconfig /usr/local/lib && \
> cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
> ln -s /usr/local/bin/python3 /usr/local/bin/python && \
> apt-get clean && \
> rm -rf /var/lib/apt/lists/*
>
> # install PyFlink
> RUN pip3 install apache-flink==1.15.0
>
> And I am running:
> docker build --tag pyflink:latest .
>
> This gives the following error:
>
>
> *#6 64.12  cwd: /tmp/pip-install-9_farey_/pemja/#6 64.12
> Complete output (1 lines):#6 64.12 Include folder should be at
> '/usr/local/openjdk-11/include' but doesn't exist. Please check you've
> installed the JDK properly.*
>
> A side note:
> The Dockerfile in the docs is missing git so initially I got the following
> error:
>
> *#7 57.73 raise OSError("%r was not found" % name)#7 57.73
> OSError: 'git' was not found *
>
> @Weihua Hu  can you please send your working
> Dockerfile?
>
> Gyula
>
> On Wed, Jul 6, 2022 at 4:16 AM Weihua Hu  wrote:
>
>> Hi Gyula,
>>
>> I can build pyFlink image successfully by following this guide. Did you
>> add a dependency outside of the documentation? And could you provide your
>> Dockerfile
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>
>> Best,
>> Weihua
>>
>>
>> On Tue, Jul 5, 2022 at 11:40 PM Gyula Fóra  wrote:
>>
>>> Well in any case either the official image is incorrect (maybe we should
>>> include JDK by default not JRE) or we should update the
>>> documentation regarding the python docker build because it simply doesn't
>>> work at the moment.
>>>
>>> I am still looking for a full working example that adds the required
>>> Python packages on top of a Flink 1.15.0 base image :)
>>>
>>> Gyula
>>>
>>> On Tue, Jul 5, 2022 at 5:36 PM Weihua Hu  wrote:
>>>
 In addition, you can try providing the Dockerfile

 Best,
 Weihua


 On Tue, Jul 5, 2022 at 11:24 PM Weihua Hu 
 wrote:

> Hi,
>
> The base image flink:1.15.0 is built from openjdk:11-jre, and this
> image only installs jre but not jdk.
> It looks like the package you want to install (pemja) depends on jdk.
> you need install openjdk-11-jdk in dockerfile,
> take a look to how it is installed in the official image:
>
>
> https://hub.docker.com/layers/openjdk/library/openjdk/11-jdk/images/sha256-bc0af19c7c4f492fe6ff0c1d1c8c0e5dd90ab801385b220347bb91dbe2b4094f?context=explore
>
>
> Best,
> Weihua
>
>
> On Tue, Jul 5, 2022 at 3:50 PM Gyula Fóra 
> wrote:
>
>> Hi All!
>>
>> I have been trying to experiment with the Flink python support on
>> Kubernetes but I got stuck creating a custom image with all the necessary
>> python libraries.
>>
>> I found this guide in the docs:
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>
>> However when I try to build a custom image using it, I get the
>> following error:
>>
>> #7 131.7 Collecting pemja==0.1.4
>> #7 131.8   Downloading pemja-0.1.4.tar.gz (32 kB)
>> #7 131.9 ERROR: Command errored out with exit status 255:
>> #7 131.9  command: /usr/local/bin/python3.7 -c 'import sys,
>> setuptools, tokenize; sys.argv[0] =
>> '"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';
>> __file__='"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';f=getattr(tokenize,
>> '"'"'open'"'"', 

Re: [ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread Xingbo Huang
Thanks a lot for being our release manager David and everyone who
contributed.

Best,
Xingbo

David Anderson  于2022年7月8日周五 06:18写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2022/07/06/release-1.15.1.html
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> David Anderson
>


Re: [ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread Xingbo Huang
Thanks a lot for being our release manager David and everyone who
contributed.

Best,
Xingbo

David Anderson  于2022年7月8日周五 06:18写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2022/07/06/release-1.15.1.html
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> David Anderson
>


[ANNOUNCE] Apache Flink 1.14.5 released

2022-06-21 Thread Xingbo Huang
The Apache Flink community is very happy to announce the release of Apache
Flink 1.14.5, which is the fourth bugfix release for the Apache Flink 1.14
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2022/06/22/release-1.14.5.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351388

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Xingbo


Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread Xingbo Huang
Hi John,

Because I can't see your code, I can only provide some possible reasons for
this error:
1. This error generally occurs in jobs where there are transfers between
Table and datastream. But given that you said you just used the sql +
python udf, this shouldn't be the case.
2. The default value of `taskmanager.memory.managed.consumer-weights` is
`OPERATOR:70,STATE_BACKEND:70,PYTHON:30`, so in your case, there is
actually no need to set it to `PYTHON:30`
3. In fact, for pure sql+python udf jobs, if you don't set error value
`PYTHON:0` in `taskmanager.memory.managed.consumer-weights`, I really can't
think of any situation where this problem will occur.

Best,
Xingbo

John Tipper  于2022年6月16日周四 19:41写道:

> Hi Xingbo,
>
> Yes, there are a number of temporary views being created, where each is
> being created using SQL (CREATE TEMPORARY VIEW ...) rather than explicit
> calls to the Table and DataStream APIs.
>
> Is this a good pattern or are there caveats I should be aware of please?
>
> Many thanks,
>
> John
>
>
> ------
> *From:* Xingbo Huang 
> *Sent:* 16 June 2022 12:34
> *To:* John Tipper 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: The configured managed memory fraction for Python worker
> process must be within (0, 1], was: %s
>
> Hi John,
>
> Does your job logic include conversion between Table and DataStream? For
> example, methods such as `create_temporary_view(path: str, data_stream:
> DataStream): -> Table`  are used.
>
> Best,
> Xingbo
>
> John Tipper  于2022年6月16日周四 18:31写道:
>
> Hi Xingbo,
>
> I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is
> running inside Kinesis on AWS so I cannot change the version.
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> On 16 Jun 2022, at 10:37, Xingbo Huang  wrote:
>
> 
> Hi John,
>
> Could you provide the code snippet and the version of pyflink you used?
>
> Best,
> Xingbo
>
>
> John Tipper  于2022年6月16日周四 17:05写道:
>
> Hi all,
>
> I'm trying to run a PyFlink unit test to test some PyFlink SQL and where
> my code uses a Python UDF.  I can't share my code but the test case is
> similar to the code here:
> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
>   When
> I have some simple SQL everything is fine. When I add a more complex query
> I get an error, which looks like it's memory related.
>
> java.lang.IllegalArgumentException: The configured managed memory fraction
> for Python worker process must be within (0, 1], was: %s. It may be because
> the consumer type "Python" was missing or set to 0 for the config option
> "taskmanager.memory.managed.consumer-weights".0.0
>
>
>
> In my test case setUp(), I try to set that value like this, but it seems
> to have no effect:
>
> self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
> "PYTHON:30")
>
>
> Am I not setting it correctly, or is there something else I need to do to
> fix this error?
>
> Many thanks,
>
> John
>
>


Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread Xingbo Huang
Hi John,

Does your job logic include conversion between Table and DataStream? For
example, methods such as `create_temporary_view(path: str, data_stream:
DataStream): -> Table`  are used.

Best,
Xingbo

John Tipper  于2022年6月16日周四 18:31写道:

> Hi Xingbo,
>
> I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is
> running inside Kinesis on AWS so I cannot change the version.
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> On 16 Jun 2022, at 10:37, Xingbo Huang  wrote:
>
> 
> Hi John,
>
> Could you provide the code snippet and the version of pyflink you used?
>
> Best,
> Xingbo
>
>
> John Tipper  于2022年6月16日周四 17:05写道:
>
>> Hi all,
>>
>> I'm trying to run a PyFlink unit test to test some PyFlink SQL and where
>> my code uses a Python UDF.  I can't share my code but the test case is
>> similar to the code here:
>> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
>>   When
>> I have some simple SQL everything is fine. When I add a more complex query
>> I get an error, which looks like it's memory related.
>>
>> java.lang.IllegalArgumentException: The configured managed memory fraction
>> for Python worker process must be within (0, 1], was: %s. It may be because
>> the consumer type "Python" was missing or set to 0 for the config option
>> "taskmanager.memory.managed.consumer-weights".0.0
>>
>>
>>
>> In my test case setUp(), I try to set that value like this, but it seems
>> to have no effect:
>>
>> self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
>> "PYTHON:30")
>>
>>
>> Am I not setting it correctly, or is there something else I need to do to
>> fix this error?
>>
>> Many thanks,
>>
>> John
>>
>>


Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread Xingbo Huang
Hi John,

Could you provide the code snippet and the version of pyflink you used?

Best,
Xingbo


John Tipper  于2022年6月16日周四 17:05写道:

> Hi all,
>
> I'm trying to run a PyFlink unit test to test some PyFlink SQL and where
> my code uses a Python UDF.  I can't share my code but the test case is
> similar to the code here:
> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
>   When
> I have some simple SQL everything is fine. When I add a more complex query
> I get an error, which looks like it's memory related.
>
> java.lang.IllegalArgumentException: The configured managed memory fraction
> for Python worker process must be within (0, 1], was: %s. It may be because
> the consumer type "Python" was missing or set to 0 for the config option
> "taskmanager.memory.managed.consumer-weights".0.0
>
>
>
> In my test case setUp(), I try to set that value like this, but it seems
> to have no effect:
>
> self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
> "PYTHON:30")
>
>
> Am I not setting it correctly, or is there something else I need to do to
> fix this error?
>
> Many thanks,
>
> John
>
>


Re: 关于PyFlink的开发环境问题

2022-06-15 Thread Xingbo Huang
Hi,

你可以执行 pip install -r flink-python/dev/dev-requirements.txt 安装开发环境所需要的依赖

Best,
Xingbo

张 兴博  于2022年6月15日周三 10:20写道:

> 您好:
>我是一名学习使用pyflink的用户,我想在ubuntu20.04上开发pyflink,但是在运行代码的时候,报错为:
>
> Traceback (most recent call last):
>   File "/root/.py", line 6, in 
> s_env = StreamExecutionEnvironment.get_execution_environment()
>   File
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> line 805, in get_execution_environment
> return StreamExecutionEnvironment(j_stream_exection_environment)
>   File
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> line 62, in __init__
> self._open()
>   File
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> line 973, in _open
> startup_loopback_server()
>   File
> "/usr/local/lib/python3.8/dist-packages/pyflink/datastream/stream_execution_environment.py",
> line 963, in startup_loopback_server
> from pyflink.fn_execution.beam.beam_worker_pool_service import \
>   File
> "/usr/local/lib/python3.8/dist-packages/pyflink/fn_execution/beam/beam_worker_pool_service.py",
> line 31, in 
> from apache_beam.options.pipeline_options import DebugOptions
>   File "/usr/local/lib/python3.8/dist-packages/apache_beam/__init__.py",
> line 96, in 
> from apache_beam import io
>   File
> "/usr/local/lib/python3.8/dist-packages/apache_beam/io/__init__.py", line
> 23, in 
> from apache_beam.io.avroio import *
>   File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/avroio.py",
> line 63, in 
> from apache_beam.io import filebasedsink
>   File
> "/usr/local/lib/python3.8/dist-packages/apache_beam/io/filebasedsink.py",
> line 36, in 
> from apache_beam.io import iobase
>   File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/iobase.py",
> line 57, in 
> from apache_beam.transforms import Impulse
>   File
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/__init__.py",
> line 25, in 
> from apache_beam.transforms.external import *
>   File
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/external.py",
> line 45, in 
> from apache_beam.runners import pipeline_context
>   File
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/pipeline_context.py",
> line 51, in 
> from apache_beam.transforms import environments
>   File
> "/usr/local/lib/python3.8/dist-packages/apache_beam/transforms/environments.py",
> line 54, in 
> from apache_beam.runners.portability.sdk_container_builder import
> SdkContainerImageBuilder
>   File
> "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/sdk_container_builder.py",
> line 44, in 
> from apache_beam.internal.gcp.auth import get_service_credentials
>   File
> "/usr/local/lib/python3.8/dist-packages/apache_beam/internal/gcp/auth.py",
> line 28, in 
> from oauth2client.client import GoogleCredentials
>   File "/usr/local/lib/python3.8/dist-packages/oauth2client/client.py",
> line 39, in 
> from oauth2client import transport
>   File "/usr/local/lib/python3.8/dist-packages/oauth2client/transport.py",
> line 17, in 
> import httplib2
> ModuleNotFoundError: No module named 'httplib2'
>
> 通过查询发现在python新版中,httplib2已经不用了?采用的名字是http.client?
> 我的python版本为3.8.10,jdk为openjdk 11.0.15(另一台为java 1.8)
> 我想知道这是什么原因造成的呢?怎么能解决这个问题呢?
>
> 感谢您在百忙之中解答我的问题,万分感谢~!
>
> 发送自 Windows 11 版邮件应用
>
>


Re: Help installing apache-flink and numpy to run flink python examples

2022-06-02 Thread Xingbo Huang
Hi Kenneth,

In flink 1.15, pyflink only guarantees support for python 3.6,3.7 and
3.8[1]. In release-1.16, pyflink will provide support for python 3.9[2].

Go back to your installation error. In flink 1.15, the version range of
numpy that pyflink depends on is numpy>=1.14.3,<1.20. So when you execute
`pip install apache-flink`, the version of numpy trying to install is
1.19.5. However, numpy 1.19.5 does not provide the wheel package for python
3.10[3]. Therefore, numpy 1.19.5 will be installed from source code, which
is often difficult to install successfully. For details, you can refer to
the source code installation of numpy[4]. The reason why you successfully
executed `pip install numpy` is that the latest 1.22.4 version of numpy is
installed by default, which provides the wheel package of python 3.10[5].

>From my point of view, your current option solution is to install a python
3.8 virtual environment with conda, and then install Pyflink.

Best,
Xingbo

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/#environment-requirements
[2] https://issues.apache.org/jira/browse/FLINK-27058
[3] https://pypi.org/project/numpy/1.19.5/#files
[4] https://numpy.org/doc/stable/user/building.html
[5] https://pypi.org/project/numpy/1.22.4/#files

Xuyang  于2022年6月2日周四 10:34写道:

> Hi, Kenneth, have you tried the setup file[1] in the Flink-Python? It
> maybe can work.
>
> [1] https://github.com/apache/flink/blob/master/flink-python/setup.py
>
> At 2022-06-02 03:53:36, "Kenneth Shine" 
> wrote:
>
> I am using MacOS Monterey 12.3.1
>
> I have download flink source and examples from GitHub
>
> I have installed python 3.10.4 from
> https://www.python.org/downloads/macos/
>
> In flink/libexec/examples/python/datastream, I run
>
>
>
> /usr/local/bin/python3 word_count.py
>
> But get error
>
>
>
> Traceback (most recent call last):
>
>   File "
> /usr/local/Cellar/apache-flink/1.15.0/libexec/examples/python/datastream/word_count.py",
> line 22, in 
>
> from pyflink.common import WatermarkStrategy, Encoder, Types
>
> ModuleNotFoundError: No module named 'pyflink'
>
>
>
> To install pyflink, I run
>
> /usr/local/bin/python3 -m pip install apache-flink
>
> But get error
>
>   × Encountered error while trying to install package.
>
>   ╰─> numpy
>
>
>
> So I install numpy
>
> /usr/local/bin/python3 -m pip install numpy
>
> Which is successful.
>
>
>
> Yet when I attempt to install pyflink now
>
> /usr/local/bin/python3 -m pip install apache-flink
>
> I get the same error regarding numpy.
>
> How can I get around this error?
>
>
>
>  The error messages from installing pyFlink are quite long.  I have
> attached more of the end of the error message.
>
>
> Thank you for any help you can give me.
>
>
>
> This electronic communication and the information and any files
> transmitted with it, or attached to it, are confidential and are intended
> solely for the use of the individual or entity to whom it is addressed and
> may contain information that is confidential, legally privileged, protected
> by privacy laws, or otherwise restricted from disclosure to anyone else. If
> you are not the intended recipient or the person responsible for delivering
> the e-mail to the intended recipient, you are hereby notified that any use,
> copying, distributing, dissemination, forwarding, printing, or copying of
> this e-mail is strictly prohibited. If you received this e-mail in error,
> please return the e-mail to the sender, delete it from your computer, and
> destroy any printed copy of it.
>
>


Re: Pyflink elastic search connectors

2022-03-29 Thread Xingbo Huang
Hi,

Are you using datastream api or table api?If you are using the table api,
you can use the connector by executing sql[1]. If you are using the
datastream api, there is really no es connector api provided, you need to
write python wrapper code, but the wrapper code is very simple. The
underlying code takes use of py4j to call the java api of es connector. For
details, you can refer to the wrapper code in kafka or pulsar[2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
[2]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py

Best,
Xingbo

Sandeep Sharat  于2022年3月29日周二 20:51写道:

> Hello Everyone,
>
> I have been working on a streaming application using elasticsearch as the
> sink. I had achieved it using the java api quite easily. But due to a
> recent policy change we are moving towards the python api for flink,
> however we were unable to find any python elastic search connectors for
> flink. We were able to find support for the kafka connectors in python.
> Does it mean that we have to write our own connectors in python  to
> make use of the flink-elasticsearch connector jar?
>
> Thanks in advance
> --
> Thanks & Regards
> Sandeep Sharat Kumar
>


Re: [ANNOUNCE] Apache Flink 1.1.4.4 released

2022-03-16 Thread Xingbo Huang
Hi Konstantin,

I have installed the wheel packages of 1.13.6 and 1.14.4 respectively, and
tested them through some examples. Thanks a lot for your work.

Best,
Xingbo

Konstantin Knauf  于2022年3月16日周三 15:29写道:

> Hi Xingbo,
>
> you are totally right. Thank you for noticing. This also affected Flink
> 1.13.6, the other release I was recently managing. I simply skipped a step
> in the release guide.
>
> It should be fixed now. Could you double-check?
>
> Cheers,
>
> Konstantin
>
> On Wed, Mar 16, 2022 at 4:07 AM Xingbo Huang  wrote:
>
> > Thanks a lot for being our release manager Konstantin and everyone who
> > contributed. I have a question about pyflink. I see that there are no
> > corresponding wheel packages uploaded on pypi, only the source package is
> > uploaded. Is there something wrong with building the wheel packages?
> >
> > Best,
> > Xingbo
> >
> > Leonard Xu  于2022年3月16日周三 01:02写道:
> >
> >> Thanks a lot for being our release manager Konstantin and everyone who
> >> involved!
> >>
> >> Best,
> >> Leonard
> >>
> >> 2022年3月15日 下午9:34,Martijn Visser  写道:
> >>
> >> Thank you Konstantin and everyone who contributed!
> >>
> >>
> >>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: [ANNOUNCE] Apache Flink 1.1.4.4 released

2022-03-16 Thread Xingbo Huang
Hi Konstantin,

I have installed the wheel packages of 1.13.6 and 1.14.4 respectively, and
tested them through some examples. Thanks a lot for your work.

Best,
Xingbo

Konstantin Knauf  于2022年3月16日周三 15:29写道:

> Hi Xingbo,
>
> you are totally right. Thank you for noticing. This also affected Flink
> 1.13.6, the other release I was recently managing. I simply skipped a step
> in the release guide.
>
> It should be fixed now. Could you double-check?
>
> Cheers,
>
> Konstantin
>
> On Wed, Mar 16, 2022 at 4:07 AM Xingbo Huang  wrote:
>
> > Thanks a lot for being our release manager Konstantin and everyone who
> > contributed. I have a question about pyflink. I see that there are no
> > corresponding wheel packages uploaded on pypi, only the source package is
> > uploaded. Is there something wrong with building the wheel packages?
> >
> > Best,
> > Xingbo
> >
> > Leonard Xu  于2022年3月16日周三 01:02写道:
> >
> >> Thanks a lot for being our release manager Konstantin and everyone who
> >> involved!
> >>
> >> Best,
> >> Leonard
> >>
> >> 2022年3月15日 下午9:34,Martijn Visser  写道:
> >>
> >> Thank you Konstantin and everyone who contributed!
> >>
> >>
> >>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: [ANNOUNCE] Apache Flink 1.1.4.4 released

2022-03-15 Thread Xingbo Huang
Thanks a lot for being our release manager Konstantin and everyone who
contributed. I have a question about pyflink. I see that there are no
corresponding wheel packages uploaded on pypi, only the source package is
uploaded. Is there something wrong with building the wheel packages?

Best,
Xingbo

Leonard Xu  于2022年3月16日周三 01:02写道:

> Thanks a lot for being our release manager Konstantin and everyone who
> involved!
>
> Best,
> Leonard
>
> 2022年3月15日 下午9:34,Martijn Visser  写道:
>
> Thank you Konstantin and everyone who contributed!
>
>
>


Re: [ANNOUNCE] Apache Flink 1.1.4.4 released

2022-03-15 Thread Xingbo Huang
Thanks a lot for being our release manager Konstantin and everyone who
contributed. I have a question about pyflink. I see that there are no
corresponding wheel packages uploaded on pypi, only the source package is
uploaded. Is there something wrong with building the wheel packages?

Best,
Xingbo

Leonard Xu  于2022年3月16日周三 01:02写道:

> Thanks a lot for being our release manager Konstantin and everyone who
> involved!
>
> Best,
> Leonard
>
> 2022年3月15日 下午9:34,Martijn Visser  写道:
>
> Thank you Konstantin and everyone who contributed!
>
>
>


Re: pyflink object to java object

2022-02-28 Thread Xingbo Huang
Hi,
With py4j, you can call any Java method. On how to create a Java Row, you
can call the `createRowWithNamedPositions` method of `RowUtils`[1].

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowUtils.java#

Best,
Xingbo

Francis Conroy  于2022年2月25日周五 14:35写道:

> Hi all,
>
> we're using pyflink for most of our flink work and are sometimes into a
> java process function.
> Our new java process function takes an argument in in the constructor
> which is a Row containing default values. I've declared my Row in pyflink
> like this:
>
> default_row = Row(ep_uuid="",
>   unit_uuid=None,
>   unit_longitude=None,
>   unit_latitude=None,
>   unit_state=None,
>   unit_country=None,
>   pf_uuid=None,
>   pf_name=None)
>
> row_type_information = RowTypeInfo([Types.STRING(),  # ep_uuid
>Types.STRING(),  # unit_uuid
>Types.DOUBLE(),  # unit_longitude
>Types.DOUBLE(),  # unit_latitude
>Types.STRING(),  # unit_state
>Types.STRING(),  # unit_country
>Types.STRING(),  # pf_uuid
>Types.STRING()  # pf_name
>])
>
> I'm now trying to get a handle to a java row object in the jvm so I can
> pass that into the process function's constructor.
>
> endpoint_info_enriched_stream = 
> DataStream(ds._j_data_stream.connect(endpoint_info_stream._j_data_stream).process(
> jvm.org.switchdin.operators.TableEnrich(j_obj)))
>
> I've tried a few approaches, but I really can't figure out how to do this,
> I'm not sure what I need on each side for this, a coder, serializer,
> pickler?
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Re: Serving Machine Learning models

2022-01-11 Thread Xingbo Huang
Hi sonia,

As far as I know, pyflink users prefer to use python udf[1][2] for model
prediction. Load the model when the udf is initialized, and then predict
each new piece of data

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/table/udfs/overview/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/operators/process_function/

Best,
Xingbo

David Anderson  于2022年1月11日周二 03:39写道:

> Another approach that I find quite natural is to use Flink's Stateful
> Functions API [1] for model serving, and this has some nice advantages,
> such as zero-downtime deployments of new models, and the ease with which
> you can use Python. [2] is an example of this approach.
>
> [1] https://flink.apache.org/stateful-functions.html
> [2] https://github.com/ververica/flink-statefun-workshop
>
> On Fri, Jan 7, 2022 at 5:55 PM Yun Gao  wrote:
>
>> Hi Sonia,
>>
>> Sorry I might not have the statistics on the provided two methods,
>> perhaps as input
>> I could also provide another method: currently there is an eco-project
>> dl-on-flink
>> that supports running DL frameworks on top of the Flink and it will
>> handle the data
>> exchange between java and python processes, which would allows to user
>> the native
>> model directly.
>>
>> Best,
>> Yun
>>
>>
>> [1] https://github.com/flink-extended/dl-on-flink
>>
>>
>>
>> --
>> From:Sonia-Florina Horchidan 
>> Send Time:2022 Jan. 7 (Fri.) 17:23
>> To:user@flink.apache.org 
>> Subject:Serving Machine Learning models
>>
>> Hello,
>>
>>
>> I recently started looking into serving Machine Learning models for
>> streaming data in Flink. To give more context, that would involve training
>> a model offline (using PyTorch or TensorFlow), and calling it from inside a
>> Flink job to do online inference on newly arrived data. I have found
>> multiple discussions, presentations, and tools that could achieve this, and
>> it seems like the two alternatives would be: (1) wrap the pre-trained
>> models in a HTTP service (such as PyTorch Serve [1]) and let Flink do async
>> calls for model scoring, or (2) convert the models into a standardized
>> format (e.g., ONNX [2]), pre-load the model in memory for every task
>> manager (or use external storage if needed) and call it for each new data
>> point.
>>
>> Both approaches come with a set of advantages and drawbacks and, as far
>> as I understand, there is no "silver bullet", since one approach could be
>> more suitable than the other based on the application requirements.
>> However, I would be curious to know what would be the "recommended" methods
>> for model serving (if any) and what approaches are currently adopted by the
>> users in the wild.
>>
>> [1] https://pytorch.org/serve/
>>
>> [2] https://onnx.ai/
>>
>> Best regards,
>>
>> Sonia
>>
>>
>>  [image: Kth Logo]
>>
>> Sonia-Florina Horchidan
>> PhD Student
>> KTH Royal Institute of Technology
>> *Software and Computer Systems (SCS)*
>> School of Electrical Engineering and Computer Science (EECS)
>> Mobil: +46769751562
>> sf...@kth.se,  www.kth.se
>>
>>
>>


Re: Datastream api implementation of a third party pyflink connector

2021-07-19 Thread Xingbo Huang
Hi Zhongle Wang,

Your understanding is correct. Firstly, you need to provide an
implementation of a java connector, then add this jar to the dependency[1],
and finally add a python connector wrapper.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/faq/#adding-jar-files

Best,
Xingbo

Wang, Zhongle  于2021年7月19日周一 下午5:43写道:

> Hi,
>
>
>
> I’m working on a pyflink datastream connector for Pravega and wish to use
> a datasource other than Kafka.
>
>
>
> Currently the Kafka connector for the python datastream api is implemented
> using a ` get_gateway` function which creates a binding to java in `
> FlinkKafkaConsumer`.
>
>
>
> So if I want to create a `FlinkPrevegaReader` that consumes other
> datasource like the Pravega, is it recommended to do in the same way? (we
> have a java reader/consumer implementation)
>
> Or the gateway thing might be changed in the future?
>
>
>
> PS: In this post(
> https://stackoverflow.com/questions/65009292/is-there-a-kinesis-connector-for-pyflink)
> Xingbo suggested that the Kinesis and other connectors will be added soon,
> but I’m not sure whether it uses the same technique mentioned above.
>
>
>
> Thanks,
>
> Zhongle Wang
>
>
>


Re: key_by problem in Pyflink

2021-07-13 Thread Xingbo Huang
Hi,

I have created the JIRA[1] to fix this bug which will be included in
release-1.13.2. The root cause is the wrong mapping of the state key to the
state. This kind of wrong mapping occurs when the key is switched, but the
state is not used. As you wrote in the example, the `data` you declared is
not used in `process_element2`

[1] https://issues.apache.org/jira/browse/FLINK-23368

Best,
Xingbo

赵飞  于2021年7月12日周一 下午10:00写道:

> Thanks. In addition, I run the program in a local mini cluster mode, not
> sure if it would affect the results.
>
> Xingbo Huang  于2021年7月12日周一 下午9:02写道:
>
>> Hi,
>>
>> I think your understanding is correct. The results seem a little wired.
>> I'm looking into this and will let you know when there are any findings.
>>
>> Best,
>> Xingbo
>>
>> 赵飞  于2021年7月12日周一 下午4:48写道:
>>
>>> Hi all,
>>> I'm using pyflink to develop a module, whose main functionality is
>>> processing user data based on specific rules. The program involves two
>>> datastreams: data and rule. They have different types, so I connect them
>>> and use a field 'product_id' as the key for key_by method. The code is as
>>> follows (just demo codes, not the actual one):
>>>
>>> import random
>>>
>>> from pyflink.common.typeinfo import Types
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.datastream.functions import KeyedCoProcessFunction
>>> from pyflink.datastream.state import MapStateDescriptor
>>> from pyflink.datastream import RuntimeContext
>>>
>>>
>>> def test(data):
>>> product_ids = set()
>>> for key, value in data.items():
>>> product_ids.add(value[0])
>>> return list(product_ids)
>>>
>>>
>>> class MyFunction(KeyedCoProcessFunction):
>>> def open(self, ctx):
>>> data_desc = MapStateDescriptor('data', Types.STRING(), 
>>> Types.ROW([Types.INT()]))
>>> self.data = ctx.get_map_state(data_desc)
>>>
>>> rule_desc = MapStateDescriptor('rule', Types.STRING(), 
>>> Types.ROW([Types.INT()]))
>>> self.rules = ctx.get_map_state(rule_desc)
>>>
>>> def process_element1(self, data_value, ctx):
>>> row_id, others = data_value[0], data_value[1:]
>>> self.data.put(row_id, others)
>>> result = []
>>> for key, value_list in self.rules.items():
>>> product_id, random_0, random_1  = value_list
>>> # Do some calculations
>>> product_ids_of_state_data = test(self.data)
>>> result.append([random_0, random_1, product_id, 
>>> product_ids_of_state_data])
>>> return result
>>>
>>> def process_element2(self, rule_value, ctx):
>>> row_id, others = rule_value[0], rule_value[1:]
>>> self.rules.put(row_id, others)
>>>
>>> def generate_data1(count):
>>> collection = []
>>> for i in range(count):
>>> collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
>>> return collection
>>>
>>> def generate_data2(count):
>>> collection = []
>>> for i in range(count):
>>> collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, 
>>> i * 2])
>>> return collection
>>>
>>>
>>> def main():
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>>
>>> data = env.from_collection(generate_data1(50))
>>> rules = env.from_collection([
>>> ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>>> ['row_1', 2, 'rule2_value0', 'rule2_value1']
>>> ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
>>> Types.STRING()]))
>>> results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
>>> y[1]).process(MyFunction())
>>> results.print()
>>>
>>> env.execute("test_job")
>>>
>>> if __name__ == "__main__":
>>> main()
>>>
>>>
>>> When processing the first datastream, which contains user data, I will
>>> access the registered MapState and get the unique product_id in it.
>>> According to the description on the official site:
>>>
>>> Keyed state is maintained in what can be thought of as an embedded
>>>> key/value store. The sta

Re: key_by problem in Pyflink

2021-07-12 Thread Xingbo Huang
Hi,

I think your understanding is correct. The results seem a little wired. I'm
looking into this and will let you know when there are any findings.

Best,
Xingbo

赵飞  于2021年7月12日周一 下午4:48写道:

> Hi all,
> I'm using pyflink to develop a module, whose main functionality is
> processing user data based on specific rules. The program involves two
> datastreams: data and rule. They have different types, so I connect them
> and use a field 'product_id' as the key for key_by method. The code is as
> follows (just demo codes, not the actual one):
>
> import random
>
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import KeyedCoProcessFunction
> from pyflink.datastream.state import MapStateDescriptor
> from pyflink.datastream import RuntimeContext
>
>
> def test(data):
> product_ids = set()
> for key, value in data.items():
> product_ids.add(value[0])
> return list(product_ids)
>
>
> class MyFunction(KeyedCoProcessFunction):
> def open(self, ctx):
> data_desc = MapStateDescriptor('data', Types.STRING(), 
> Types.ROW([Types.INT()]))
> self.data = ctx.get_map_state(data_desc)
>
> rule_desc = MapStateDescriptor('rule', Types.STRING(), 
> Types.ROW([Types.INT()]))
> self.rules = ctx.get_map_state(rule_desc)
>
> def process_element1(self, data_value, ctx):
> row_id, others = data_value[0], data_value[1:]
> self.data.put(row_id, others)
> result = []
> for key, value_list in self.rules.items():
> product_id, random_0, random_1  = value_list
> # Do some calculations
> product_ids_of_state_data = test(self.data)
> result.append([random_0, random_1, product_id, 
> product_ids_of_state_data])
> return result
>
> def process_element2(self, rule_value, ctx):
> row_id, others = rule_value[0], rule_value[1:]
> self.rules.put(row_id, others)
>
> def generate_data1(count):
> collection = []
> for i in range(count):
> collection.append(['row_%d' % i, i % 2 + 1, 'a_%d' % i, i * 2])
> return collection
>
> def generate_data2(count):
> collection = []
> for i in range(count):
> collection.append(['row_%d' % i, random.choice([1, 2]), 'a_%d' % i, i 
> * 2])
> return collection
>
>
> def main():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
>
> data = env.from_collection(generate_data1(50))
> rules = env.from_collection([
> ['row_0', 1, 'rule1_value0', 'rule1_value1'],
> ['row_1', 2, 'rule2_value0', 'rule2_value1']
> ], type_info=Types.ROW([Types.STRING(), Types.INT(), Types.STRING(), 
> Types.STRING()]))
> results = data.connect(rules).key_by(lambda x: x[1], lambda y: 
> y[1]).process(MyFunction())
> results.print()
>
> env.execute("test_job")
>
> if __name__ == "__main__":
> main()
>
>
> When processing the first datastream, which contains user data, I will
> access the registered MapState and get the unique product_id in it.
> According to the description on the official site:
>
> Keyed state is maintained in what can be thought of as an embedded
>> key/value store. The state is partitioned and distributed strictly together
>> with the streams that are read by the stateful operators. Hence, access to
>> the key/value state is only possible on *keyed streams*, i.e. after a
>> keyed/partitioned data exchange, and *is restricted to the values
>> associated with the current event’s key*
>
>
> I assume that each time after accessing the MapState, only one product_id
> value should be got. But I get the following outputs after running the
> script:
>
> ['rule1_value0', 'rule1_value1', 1, [1]]
>> ['rule1_value0', 'rule1_value1', 1, [1]]
>> ['rule1_value0', 'rule1_value1', 1, [1]]
>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>> ['rule1_value0', 'rule1_value1', 1, [1, 2]]
>> ['rule2_value0', 'rule2_value1', 2, [1, 2]]
>>
>
> It shows that each user data does be processed based on the according
> rules, but there has other product's data (the last element of each row) in
> the MapState. And there are other strange points:
> 1. I have two functions to generate data. When using data generated by
> generate_data2, more likely the above problem occurs, compared to
> generate_data1.
> 2. When the data size is more than 50, more likely the above problem
> occurs.
> 3. Same codes and same data, different outputs would be got after running
> multiple times. Sometimes the output is as expected, sometimes are not.
>
> Does anybody know why? Or my understanding of keyed-state is wrong?
>
> Thanks.
>
>


Re: PyFlink performance and deployment issues

2021-07-08 Thread Xingbo Huang
Hi Wouter,

The JIRA is https://issues.apache.org/jira/browse/FLINK-23309. `bundle
time` is from the perspective of your e2e latency. Regarding the `bundle
size`, generally larger value will provide better throughput, but it should
not be set too large, which may cause no output to be seen downstream for a
long time and the pressure will be too great during checkpoint.

Best,
Xingbo

Wouter Zorgdrager  于2021年7月8日周四 下午4:32写道:

> Hi Xingbo, all,
>
> That is good to know, thank you. Is there any Jira issue I can track? I'm
> curious to follow this progress! Do you have any recommendations with
> regard to these two configuration values, to get somewhat reasonable
> performance?
>
> Thanks a lot!
> Wouter
>
> On Thu, 8 Jul 2021 at 10:26, Xingbo Huang  wrote:
>
>> Hi Wouter,
>>
>> In fact, our users have encountered the same problem. Whenever the
>> `bundle size` or `bundle time` is reached, the data in the buffer needs to
>> be sent from the jvm to the pvm, and then waits for the pym to be processed
>> and sent back to the jvm to send all the results to the downstream
>> operator, which leads to a large delay, especially when it is a small size
>> event as small messages are hard to be processed in pipeline.
>>
>> I have been solving this problem recently and I plan to make this
>> optimization to release-1.14.
>>
>> Best,
>> Xingbo
>>
>> Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:
>>
>>> Hi Dian, all,
>>>
>>>  I will come back to the other points asap. However, I’m still confused
>>> about this performance. Is this what I can expect in PyFlink in terms of
>>> performance? ~ 1000ms latency for single events? I also had a very simple
>>> setup where I send 1000 events to Kafka per second and response
>>> times/latencies was around 15 seconds for single events. I understand there
>>> is some Python/JVM overhead but since Flink is so performant, I would
>>> expect much better numbers. In the current situation, PyFlink would just be
>>> unusable if you care about latency. Is this something that you expect to be
>>> improved in the future?
>>>
>>> I will verify how this works out for Beam in a remote environment.
>>>
>>> Thanks again!
>>> Wouter
>>>
>>>
>>> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>>>
>>>> Hi Wouter,
>>>>
>>>> 1) Regarding the performance difference between Beam and PyFlink, I
>>>> guess it’s because you are using an in-memory runner when running it
>>>> locally in Beam. In that case, the code path is totally differently
>>>> compared to running in a remote cluster.
>>>> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
>>>> Could you submit a java job with similar commands to see how it runs?
>>>> 3) Regarding to `flink run-application`, could you share the exception
>>>> stack?
>>>>
>>>> Regards,
>>>> Dian
>>>>
>>>> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>>>>
>>>> uses
>>>>
>>>>
>>>>


Re: PyFlink performance and deployment issues

2021-07-08 Thread Xingbo Huang
Hi Wouter,

In fact, our users have encountered the same problem. Whenever the `bundle
size` or `bundle time` is reached, the data in the buffer needs to be sent
from the jvm to the pvm, and then waits for the pym to be processed and
sent back to the jvm to send all the results to the downstream operator,
which leads to a large delay, especially when it is a small size event as
small messages are hard to be processed in pipeline.

I have been solving this problem recently and I plan to make this
optimization to release-1.14.

Best,
Xingbo

Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:

> Hi Dian, all,
>
>  I will come back to the other points asap. However, I’m still confused
> about this performance. Is this what I can expect in PyFlink in terms of
> performance? ~ 1000ms latency for single events? I also had a very simple
> setup where I send 1000 events to Kafka per second and response
> times/latencies was around 15 seconds for single events. I understand there
> is some Python/JVM overhead but since Flink is so performant, I would
> expect much better numbers. In the current situation, PyFlink would just be
> unusable if you care about latency. Is this something that you expect to be
> improved in the future?
>
> I will verify how this works out for Beam in a remote environment.
>
> Thanks again!
> Wouter
>
>
> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>
>> Hi Wouter,
>>
>> 1) Regarding the performance difference between Beam and PyFlink, I guess
>> it’s because you are using an in-memory runner when running it locally in
>> Beam. In that case, the code path is totally differently compared to
>> running in a remote cluster.
>> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
>> Could you submit a java job with similar commands to see how it runs?
>> 3) Regarding to `flink run-application`, could you share the exception
>> stack?
>>
>> Regards,
>> Dian
>>
>> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>>
>> uses
>>
>>
>>


Re: PyFlink performance and deployment issues

2021-07-07 Thread Xingbo Huang
Hi Wouter,
Sorry for the late reply. I will try to answer your questions in detail.

1. >>> Perforce problem.
When running udf job locally, beam will use a loopback way to connect back
to the python process used by the compilation job, so the time of starting
up the job will come faster than pyflink which will create a new python
process to execute udf code.

2. >>> However, this command created a local MiniCluster again rather than
submitting it to my remote cluster.
I tried to successfully submit a Python job to the standalone cluster to
run through the following command

.bin/start-cluster.sh
./bin/flink run --target remote \
-m localhost:8086 \
-pyarch /Users/duanchen/venv/venv.zip \
-pyexec venv.zip/venv/bin/python \
--parallelism 1 \
--python
/Users/duanchen/sourcecode/pyflink-performance-demo/python/flink/flink-perf-test.py
\
--jarfile
/Users/duanchen/sourcecode/pyflink-performance-demo/java/target/flink-perf-tests-0.1.jar

The situation you encountered is very strange

3. >>> In my second attempt, I tried deploying it to a Kubernetes cluster
using the following command:

When running in Application mode, you should be sure that all paths are
accessible by the JobManager of your application. The path of
~/Documents/runtime.py is under your client side, right? You need to use
the path under your k8s cluster. This part of the document does not explain
these implicit things well.

4. >>> Lastly, I wondered if it is possible to set a key for events send to
the KafkaProducer.
You can see if Kafka Table Connector[1] can meet your needs.


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#features

Best,
Xingbo

Wouter Zorgdrager  于2021年7月6日周二 下午4:58写道:

> Dear community,
>
> I have been struggling a lot with the deployment of my PyFlink job.
> Moreover, the performance seems to be very disappointing especially the
> low-throughput latency. I have been playing around with configuration
> values, but it has not been improving.
> In short, I have a Datastream job with multiple Python operators including
> a ProcessFunction. The job reads from Kafka and writes to Kafka again. For
> single events, E2E latency has been somewhere between 600ms and 2000ms.
> When I'm increasing throughput, latency becomes in the order of seconds.
> This is when I configure my job like this
> config.set_integer("python.fn-execution.bundle.time", 1)
> config.set_integer("python.fn-execution.bundle.size", 1)
> I tried several configuration values, but the results are similar.
> Interestingly, I have a similar Python streaming application written in
> Apache Beam which does have low-latency, single events are processed <
> 30ms.  If I recall correctly, they use the same technique with bundling and
> sending to Python processes.
> On the other hand, Beam uses an in-memory runner when running locally
> which might change the situation. I'm not sure how that compares to a local
> Flink MiniCluster.
>
> I hoped that performance might improve when I deploy this on a (remote)
> Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink
> job to a remote Flink cluster. In my first attempt, I created a local TM +
> JM setup and tried to deploy it using the "./flink run" command.
> However, this command created a local MiniCluster again rather than
> submitting it to my remote cluster. The full command was:
> ./flink run --target remote \
> -m localhost:8081 \
> -pyarch venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 4 \
> --python ~/Documents/runtime.py \
> --jarfile ~/Documents/combined.jar
>
> Note that venv.zip stores all the Python dependencies for my PyFlink job
> whereas combined.jar stores the Java dependencies. I tried several
> variants of this command, but it *never *submitted to my running
> JobManager and always ran it locally.
> In my second attempt, I tried deploying it to a Kubernetes cluster using
> the following command:
>
> ./flink run-application \
> --target kubernetes-application \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dtaskmanager.memory.process.size=4096m \
> -Dkubernetes.taskmanager.cpu=2 \
> -Dkubernetes.service-account=flink-service-account \
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dkubernetes.container.image=pyflink:latest \
> -pyarch venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 4 \
> -py ~/Documents/runtime.py \
> --jarfile ~/Documents/combined.jar
>
> I created the pyflink:latest image by following the documentation here
> [1] It was unclear to me if had to include my project files in this Docker
> image.
> When running it like this, it did submit it to the remote K8s cluster but
> I got an exception that it could not find my runtime.py file in some sort
> of tmp folder.
>
> Lastly, I wondered if it is possible to set a key for events send to the
> KafkaProducer. Right now, it seems you can only configure some (static)
> properties and the serializer.
> Is there are a workaround to be able to 

Re: [VOTE] Release 1.13.1, release candidate #1

2021-05-27 Thread Xingbo Huang
+1 (non-binding)

- verified checksums and signatures
- built from source code
- check apache-flink source/wheel package content
- run python udf job

Best,
Xingbo

Dawid Wysakowicz  于2021年5月27日周四 下午9:45写道:

> +1 (binding)
>
>- verified signatures and checksums
>- built from sources and run an example, quickly checked Web UI
>- checked diff of pom.xml and NOTICE files from 1.13.0,
>- there were no version changes,
>   - checked the updated licenses of javascript dependencies
>
> Best,
>
> Dawid
> On 26/05/2021 11:15, Matthias Pohl wrote:
>
> Hi Dawid,
> +1 (non-binding)
>
> Thanks for driving this release. I checked the following things:
> - downloaded and build source code
> - verified checksums
> - double-checked diff of pom files between 1.13.0 and 1.13.1-rc1
> - did a visual check of the release blog post
> - started cluster and ran jobs (WindowJoin and WordCount); nothing
> suspicious found in the logs
> - verified change FLINK-22866 manually whether the issue is fixed
>
> Best,
> Matthias
>
> On Tue, May 25, 2021 at 3:33 PM Dawid Wysakowicz  
> 
> wrote:
>
>
> Hi everyone,
> Please review and vote on the release candidate #1 for the version 1.13.1,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint 31D2DD10BFC15A2D [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.13.1-rc1" [5],
> * website pull request listing the new release and adding announcement
> blog post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Best,
> Dawid
>
> [1]https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4]https://repository.apache.org/content/repositories/orgapacheflink-1422/
> [5] https://github.com/apache/flink/tree/release-1.13.1-rc1
> [6] https://github.com/apache/flink-web/pull/448
>
>


Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Xingbo Huang
Hi Sumeet,

Due to the limitation of the original PyFlink serializers design, there is
no way to pass attribute names to Row in row-based operations. In
release-1.14, I am reconstructing the implementations of serializers[1].
After completion, accessing attribute names of `Row` in row-based
operations will be supported[2].

About the work around way in releases-1.13, maybe you need to manually set
the field_names of Row. e.g.
```
def my_table_tranform_fn(x: Row):
x.set_field_names(['a', 'b', 'c'])
...
```

[1] https://issues.apache.org/jira/browse/FLINK-22612
[2] https://issues.apache.org/jira/browse/FLINK-22712

Best,
Xingbo

Sumeet Malhotra  于2021年5月19日周三 下午4:45写道:

> Hi,
>
> According to the documentation for PyFlink Table row based operations [1],
> typical usage is as follows:
>
> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
> def split(x: Row) -> Row:
> for s in x[1].split(","):
> yield x[0], s
>
> table.flat_map(split)
>
> Is there any way that row fields inside the UDTF can be accessed by
> their attribute names instead of array index? In my use case, I'm doing the
> following:
>
> raw_data = t_env.from_path('MySource')
> raw_data \
> .join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
> .flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
> .execute_insert("MySink")
>
> In the table function `my_flat_map_fn` I'm unable to access the fields of
> the row by their attribute names i.e., assuming the input argument to the
> table function is x, I cannot access fields as x.a, x.b or x.c, instead I
> have use use x[0], x[1] and x[2]. The error I get is the _fields is not
> populated.
>
> In my use case, the number of columns is very high and working with
> indexes is so much error prone and unmaintainable.
>
> Any suggestions?
>
> Thanks,
> Sumeet
>
>


Re: How to tell between a local mode run vs. remote mode run?

2021-05-05 Thread Xingbo Huang
Hi Yik San,
You can check whether the execution environment used is
`LocalStreamEnvironment` and you can get the class object corresponding to
the corresponding java object through py4j in PyFlink. You can take a look
at the example I wrote below, I hope it will help you
```
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
from py4j.java_gateway import get_java_class


def test():
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(
env, environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_blink_planner().build())
gateway = get_gateway()

# get the execution environment class
env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass()

# get the LocalStreamEnvironment class
local_stream_environment_class = get_java_class(

gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment)
print(env_class == local_stream_environment_class)


if __name__ == '__main__':
test()

```

Yik San Chan  于2021年5月5日周三 下午12:04写道:

> Hi,
>
> According to
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/faq/
>
> > When executing jobs in mini cluster(e.g. when executing jobs in IDE)
> ... please remember to explicitly wait for the job execution to finish as
> these APIs are asynchronous.
>
> I hope my program will be able to run in both local mode as well as in
> remote mode. Therefore I hope to do something like:
>
> ```python
> result = ...
> if local_mode:
>   result.wait()
> else:
>   result
> ```
>
> Is there a way to tell if the program is run under local mode vs. remote
> mode?
>
> Best,
> Yik San
>


Re: Extract/Interpret embedded byte data from a record

2021-04-15 Thread Xingbo Huang
Hi Sumeet,

Python Row-based operation will be supported in the releases-1.13. I guess
you are looking at the code of the master branch. Since you are using the
Python Table API, you can use python udf to parse your data. For the
details of python UDF, you can refer to the doc[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions

Best,
Xingbo

Sumeet Malhotra  于2021年4月15日周四 上午9:08写道:

> Additional observation: From the Flink repo, the file
> "flink-python/pyflink/table/table.py" seems to support map(), flat_map()
> and other row based operations although the 1.12 documentation doesn't
> reflect that. Is that correct? From the code, it appears that these
> operations are supported in Python.
>
> Thanks,
> Sumeet
>
> On Thu, Apr 15, 2021 at 6:31 AM Sumeet Malhotra 
> wrote:
>
>> Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly
>> Table APIs. The documentation (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations)
>> suggests that Map() function is not currently supported in Python. So, what
>> do you think would be my options here. Should I convert to a data stream to
>> perform this in Python?
>>
>> Thanks again,
>> Sumeet
>>
>>
>> On Wed, Apr 14, 2021 at 7:09 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> One thing that you can do is to read this record using Avro keeping
>>> `Result` as `bytes` and in a subsequent mapping function, you could change
>>> the record type and deserialize the result. In Data Stream API:
>>>
>>> source.map(new MapFunction>> record_with_deserialized_result> { ...} )
>>>
>>> Best,
>>> Piotrek
>>>
>>> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
>>> napisał(a):
>>>
 Hi,

 I'm reading data from Kafka, which is Avro encoded and has the
 following general schema:

 {
   "name": "SomeName",
   "doc": "Avro schema with variable embedded encodings",
   "type": "record",
   "fields": [
 {
   "name": "Name",
   "doc": "My name",
   "type": "string"
 },
 {
   "name": "ID",
   "doc": "My ID",
   "type": "string"
 },
 {
   "name": "Result",
   "doc": "Result data, could be encoded differently",
   "type": "bytes"
 },
 {
   "name": "ResultEncoding",
   "doc": "Result encoding media type (e.g. application/avro,
 application/json)",
   "type": "string"
 },
   ]
 }

 Basically, the "Result" field is bytes whose interpretation depends
 upon the "ResultEncoding" field i.e. either avro or json. The "Result" byte
 stream has its own well defined schema also.

 My use case involves extracting/aggregating data from within the
 embedded "Result" field. What would be the best approach to perform this
 runtime decoding and extraction of fields from the embedded byte data?
 Would user defined functions help in this case?

 Thanks in advance!
 Sumeet




Re: [ANNOUNCE] Release 1.13.0, release candidate #0

2021-04-01 Thread Xingbo Huang
Hi Dawid,

Thanks a lot for the great work! Regarding to the issue of flink-python, I
have provided a quick fix and will try to fix it ASAP.

Best,
Xingbo

Dawid Wysakowicz  于2021年4月2日周五 上午4:04写道:

> Hi everyone,
> As promised I created a release candidate #0 for the version 1.13.0. I am
> not starting a vote for this release as I've created it mainly for
> verifying the release process. We are still aware of some improvements
> coming in shortly. However we will greatly appreciate any help testing this
> RC already. It can help tremendously identifying any problems early.
>
> Unfortunately I was not able to create binary convenience release for
> flink-python, because of a bug in the release scripts which can be tracked
> in https://issues.apache.org/jira/browse/FLINK-22095
>
> The complete staging area is available for your review, which includes:
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [1], which are signed with the key with
> fingerprint 31D2DD10BFC15A2D [2],
> * all artifacts to be deployed to the Maven Central Repository [3],
> * source code tag "release-1.2.3-rc3" [4],
>
> Your help testing the release will be greatly appreciated!
>
> Thanks,
> Dawid Wysakowicz
>
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.0-rc0/
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> [3]
> https://repository.apache.org/content/repositories/orgapacheflink-1417/
> [4] https://github.com/apache/flink/tree/release-1.13.0-rc0
>


Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

2021-03-19 Thread Xingbo Huang
Yes, you need to ensure that the key and value types of the Map are
determined

Best,
Xingbo

Yik San Chan  于2021年3月19日周五 下午3:41写道:

> I got why regarding the simplified question - the dummy parser should
> return key(string)-value(string), otherwise it fails the result_type spec
>
> On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan 
> wrote:
>
>> Hi Dian,
>>
>> I simplify the question in
>> https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully.
>> You can also find the updated question below:
>>
>> I have a PyFlink job that reads from a file, filter based on a condition,
>> and print. This is a `tree` view of my working directory. This is the
>> PyFlink script main.py:
>>
>> ```python
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, DataTypes
>> from pyflink.table.udf import udf
>>
>> # https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
>> #
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html
>>
>> @udf(input_types=[DataTypes.STRING()],
>> result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
>> def parse(s):
>>   import json
>>   # a dummy parser
>>   res = {'item_id': 123, 'tag': 'a'}
>>   return res
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> t_env = StreamTableEnvironment.create(env)
>>
>> t_env.register_function("parse", parse)
>>
>> my_source_ddl = """
>> create table mySource (
>> id BIGINT,
>> contentstr STRING
>> ) with (
>> 'connector' = 'filesystem',
>> 'format' = 'json',
>> 'path' = '/tmp/input'
>> )
>> """
>>
>> my_sink_ddl = """
>> create table mySink (
>> id BIGINT
>> ) with (
>> 'connector' = 'print'
>> )
>> """
>>
>> my_transform_dml = """
>> insert into mySink
>> with t1 as (
>> select id, parse(contentstr) as content
>> from mySource
>> )
>> select id
>> from t1
>> where content['item_id'] is not null
>> and content['tag'] = 'a'
>> """
>>
>> t_env.execute_sql(my_source_ddl)
>> t_env.execute_sql(my_sink_ddl)
>> t_env.execute_sql(my_transform_dml).wait()
>> ```
>>
>> To run the `main.py`:
>> - Ensure installing pyflink==1.12.0 in my conda env
>> - /tmp/input has a single row of content `{"id":1,"tag":"a"}`
>>
>> Then I run `main.py` and I get the exception:
>>
>> ```
>> Traceback (most recent call last):
>>   File "udf_parse.py", line 53, in 
>> t_env.execute_sql(my_transform_dml).wait()
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py",
>> line 76, in wait
>> get_method(self._j_table_result, "await")()
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py",
>> line 1286, in __call__
>> answer, self.gateway_client, self.target_id, self.name)
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>> line 147, in deco
>> return f(*a, **kw)
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py",
>> line 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o53.await.
>> : java.util.concurrent.ExecutionException:
>> org.apache.flink.table.api.TableException: Failed to wait job finish
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> at
>> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)
>> at
>> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.table.api.TableException: Failed to wait job
>> finish
>> at
>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
>> at
>> 

Re: pyflink UDTF求助!

2021-03-18 Thread Xingbo Huang
Hi,

经过排查,这个确实一个bug。问题出在没有正确处理在sub-query中使用的python udf。我已经创建JIRA[1]
来记录这个问题了。目前的workaroud方案是使用Table API。
具体可以参考下面的代码:
>>>
a = t_env.sql_query("""
SELECT
 hotime ,
 before_ta ,
 before_rssi ,
 after_ta ,
 after_rssil ,
 nb_tath ,
 nb_rssith ,
 train_and_predict(nb_tath, nb_rssith) predict
FROM source
""")
result = a.join_lateral("splitStr(predict) as (nbr_rssi, nbr_ta)")


[1] https://issues.apache.org/jira/browse/FLINK-21856

Best,
Xingbo

陈康 <844256...@qq.com> 于2021年3月18日周四 下午1:30写道:

> apache-flink 1.11.1
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: pyflink使用的一些疑问

2021-03-17 Thread Xingbo Huang
Hi,

其实pyflink作业就两种,一种是用了python udf的,一种是没用python udf
1. 对于没用python
udf的作业,你写的所有python代码就是api层调用,只负责在客户端编译作业。你可以认为实际运行的作业代码全都是java的同一套代码,都是在JVM里面跑的,也就不存在性能差别,如果你觉得哪个操作性能不行,那就得去分析java对应算子的性能问题。
2. 对于用了python
udf的作业,因为你写的udf函数内容是python代码,这种代码在运行时JVM不认识,你需要有PVM执行这种代码,所以会起python进程专门执行udf里面的内容,所以涉及到IPC通信等等。

不知道我描述清楚没有,总的来说,就是你不用python udf的话,那就等同是你写了一个java的flink作业。

Best,
Xingbo

xiaoyue <18242988...@163.com> 于2021年3月17日周三 下午12:03写道:

> Hi, Xingbo
> 想跟您了解一下关于sql_query执行上的细节,flink1.12版本底层执行sql语句的过程中,是否有谓词下退的优化?
> 从相关的代码测试结果看:
> 1. pyflink1.11版本的connector定义支持参数read.query来获取数据,执行效率很高,猜测这部分执行交由数据库完成;
> 2. pyflink1.12版本取消了read.query参数,当定义多个数据源执行join等操作时,耗时很明显(pyflink)
> 所以,基于上述这种情况,想跟您请教一下这部分耗时,也是因为python的语言缺陷,或者ipc开销?还是底层的实现设计导致的呢?
> 感谢~
> 在 2021-03-16 14:27:22,"Xingbo Huang"  写道:
> >Hi,
> >
> >补充回答两点
> >1. 现在Table上是支持sliding window和Tumpling Window的Pandas UDAF[1]的,
> >在1.13会支持session
>
> >window的UDAF的支持。对于datastream上window的支持,对于上述几种window,你可以转到table上去操作,对于自定义window,datastream会在1.13支持。
> >
> >2. 关于性能问题,如果你不使用Python
> >UDFs的话,本质就是跑的Java的代码,python起的作用只是在客户端编译JobGraph的作用,所以不存在说Python
> >sql_update的运行性能比Java的慢,因为实际运行的代码是一模一样的。对于你使用了Python UDF的话,由于相比Java UDF,
> >多了IPC的通信开销,以及Python本身的性能就不如Java
> >Code,目前性能差别大概在6到7倍,我们也一直在性能上做努力,未来希望做到的是完全赶上Java code,甚至C code的性能。
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> >
> >Best,
> >Xingbo
> >
> >xiaoyue  于2021年3月16日周二 上午11:42写道:
> >
> >> 您好,
> >> 目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。
> >> pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;
> >> 不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;
> >> 目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多;
> >> 个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。
> >> 以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~
> >> 还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好~
> >>
> >>
> >>
> >>
> >> xiao...@ysstech.com
> >>
> >> 发件人: qian he
> >> 发送时间: 2021-03-14 18:59
> >> 收件人: user-zh-flink
> >> 主题: pyflink使用的一些疑问
> >> 你好,
> >>
> >>
> >>
> 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map
> >> reduce函数,所以有以下疑问:
> >> 1.Python flink的SDK还没支持dataset吗?
> >> 2.是不是有其他替代方法?
> >> 3.如果还没支持,有计划支持的时间吗?
> >> 4.flink table为啥不支持map reduce操作?
> >>
> 5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
> >> reduce操作,对应pandas项目改造成flink,有什么好的建议么?
> >> 6. datastream api为什么没有实现Windows方法?后面版本会支持吗?
> >>
> >> 非常感谢,十分看好flink,希望社区越做越大,辛苦了!
> >>
>


Re: pyflink使用的一些疑问

2021-03-16 Thread Xingbo Huang
Hi,

补充回答两点
1. 现在Table上是支持sliding window和Tumpling Window的Pandas UDAF[1]的,
在1.13会支持session
window的UDAF的支持。对于datastream上window的支持,对于上述几种window,你可以转到table上去操作,对于自定义window,datastream会在1.13支持。

2. 关于性能问题,如果你不使用Python
UDFs的话,本质就是跑的Java的代码,python起的作用只是在客户端编译JobGraph的作用,所以不存在说Python
sql_update的运行性能比Java的慢,因为实际运行的代码是一模一样的。对于你使用了Python UDF的话,由于相比Java UDF,
多了IPC的通信开销,以及Python本身的性能就不如Java
Code,目前性能差别大概在6到7倍,我们也一直在性能上做努力,未来希望做到的是完全赶上Java code,甚至C code的性能。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Best,
Xingbo

xiaoyue  于2021年3月16日周二 上午11:42写道:

> 您好,
> 目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。
> pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;
> 不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;
> 目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多;
> 个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。
> 以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~
> 还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好~
>
>
>
>
> xiao...@ysstech.com
>
> 发件人: qian he
> 发送时间: 2021-03-14 18:59
> 收件人: user-zh-flink
> 主题: pyflink使用的一些疑问
> 你好,
>
>
> 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map
> reduce函数,所以有以下疑问:
> 1.Python flink的SDK还没支持dataset吗?
> 2.是不是有其他替代方法?
> 3.如果还没支持,有计划支持的时间吗?
> 4.flink table为啥不支持map reduce操作?
> 5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
> reduce操作,对应pandas项目改造成flink,有什么好的建议么?
> 6. datastream api为什么没有实现Windows方法?后面版本会支持吗?
>
> 非常感谢,十分看好flink,希望社区越做越大,辛苦了!
>


Re: Can I use PyFlink together with PyTorch/Tensorflow/PyTorch

2021-03-16 Thread Xingbo Huang
Hi Yik San,

Thanks for the investigation of PyFlink together with all these ML libs.
IMO, you could refer to the flink-ai-extended project that supports the
Tensorflow on Flink, PyTorch on Flink etc, whose repository url is
https://github.com/alibaba/flink-ai-extended. Flink AI Extended is a
project extending Flink to various machine learning scenarios, which could
be used together with PyFlink. You can also join the group by scanning the
QR code involved in the README file.

Best,
Xingbo

Yik San Chan  于2021年3月15日周一 上午11:06写道:

> Hi community,
>
> I am exploring PyFlink and I wonder if it is possible to use PyFlink
> together with all these ML libs that ML engineers normally use: PyTorch,
> Tensorflow, Scikit Learn, Xgboost, LightGBM, etc.
>
> According to this SO thread
> ,
> PySpark cannot use Scikit Learn directly inside UDF because Scikit Learn
> algorithms are not implemented to be distributed, while Spark runs
> distributedly.
>
> Given PyFlink is similar to PySpark, I guess the answer may be "no". But I
> would love to double check, and to see what I need to do to make PyFlink
> able to define UDFs using these ML libs.
>
>
> (This question is cross-posted on StackOverflow
> https://stackoverflow.com/questions/66631859/can-i-use-pyflink-together-with-pytorch-tensorflow-scikitlearn-xgboost-lightgbm
> )
>
>
> Thanks.
>
>
> Best,
>
> Yik San
>


Re: Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

2021-03-15 Thread Xingbo Huang
Hi,

The problem is that the legacy DataSet you are using does not support the
FileSystem connector you declared. You can use blink Planner to achieve
your needs.

>>>
t_env = BatchTableEnvironment.create(
environment_settings=EnvironmentSettings.new_instance()
.in_batch_mode().use_blink_planner().build())
t_env._j_tenv.getPlanner().getExecEnv().setParallelism(1)

my_source_ddl = """
create table mySource (
word VARCHAR
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/input'
)
"""

my_sink_ddl = """
create table mySink (
word VARCHAR,
`count` BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output'
)
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.execute_insert('mySink').wait()
>>>

Best,
Xingbo

Yik San Chan  于2021年3月15日周一 下午1:26写道:

> (The question is cross-posted on StackOverflow
> https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w
> )
>
> I am running below PyFlink program (copied from
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
> )
>
> ```python
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.expressions import lit
>
> exec_env = ExecutionEnvironment.get_execution_environment()
> exec_env.set_parallelism(1)
> t_config = TableConfig()
> t_env = BatchTableEnvironment.create(exec_env, t_config)
>
> t_env.connect(FileSystem().path('/tmp/input')) \
> .with_format(OldCsv()
>  .field('word', DataTypes.STRING())) \
> .with_schema(Schema()
>  .field('word', DataTypes.STRING())) \
> .create_temporary_table('mySource')
>
> t_env.connect(FileSystem().path('/tmp/output')) \
> .with_format(OldCsv()
>  .field_delimiter('\t')
>  .field('word', DataTypes.STRING())
>  .field('count', DataTypes.BIGINT())) \
> .with_schema(Schema()
>  .field('word', DataTypes.STRING())
>  .field('count', DataTypes.BIGINT())) \
> .create_temporary_table('mySink')
>
> tab = t_env.from_path('mySource')
> tab.group_by(tab.word) \
>.select(tab.word, lit(1).count) \
>.execute_insert('mySink').wait()
> ```
>
> To verify it works, I did the following in order:
>
> 1. Run `echo -e  "flink\npyflink\nflink" > /tmp/input`
> 1. Run `python WordCount.py`
> 1. Run `cat /tmp/out` and find expected output
>
> Then I changed my PyFlink program a bit to prefer SQL over Table API, but
> I find it doesn't work.
>
> ```python
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.expressions import lit
>
> exec_env = ExecutionEnvironment.get_execution_environment()
> exec_env.set_parallelism(1)
> t_config = TableConfig()
> t_env = BatchTableEnvironment.create(exec_env, t_config)
>
> my_source_ddl = """
> create table mySource (
> word VARCHAR
> ) with (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = '/tmp/input'
> )
> """
>
> my_sink_ddl = """
> create table mySink (
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = '/tmp/output'
> )
> """
>
> t_env.sql_update(my_source_ddl)
> t_env.sql_update(my_sink_ddl)
>
> tab = t_env.from_path('mySource')
> tab.group_by(tab.word) \
>.select(tab.word, lit(1).count) \
>.execute_insert('mySink').wait()
> ```
>
> Here's the error:
>
> ```
> Traceback (most recent call last):
>   File "WordCount.py", line 38, in 
> .execute_insert('mySink').wait()
>   File
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py",
> line 864, in execute_insert
> return TableResult(self._j_table.executeInsert(table_path, overwrite))
>   File
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 162, in deco
> raise java_exception
> pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)

Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-15 Thread Xingbo Huang
Hi,

>From the error message, I think the problem is no python interpreter on
your TaskManager machine. You need to install a python 3.5+ interpreter on
the TM machine, and this python environment needs to install pyflink (pip
install apache-flink). For details, you can refer to the document[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html

Best,
Xingbo

Robert Cullen  于2021年3月16日周二 上午2:58写道:

> Okay, I added the jars and fixed that exception. However I have a new
> exception that is harder to decipher:
>
> 2021-03-15 14:46:20
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
> at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Cannot run program "python": error=2, No such 
> file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at 
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
> at 
> org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
> at 
> org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
> at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
> at 
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
> at 
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
> at 
> 

Re: pyflink 如何使用session window对相同pv数据聚合

2021-03-08 Thread Xingbo Huang
Hi,
1.12 还不支持session window的udaf,在1.13上将提供这部分的支持,具体可以关注JIRA[1]。
然后,1.12是支持ProcessFunction和KeyedProcessFunction的,具体可以参考代码[2]

[1] https://issues.apache.org/jira/browse/FLINK-21630
[2]
https://github.com/apache/flink/blob/release-1.12/flink-python/pyflink/datastream/functions.py

Best,
Xingbo

Hongyuan Ma  于2021年3月8日周一 下午7:10写道:

> 我也想知道,我看文档,目前pyflink似乎还不支持processfunction
>
>
> 在2021年03月08日 19:03,kk 写道:
> hi,all:
>
> 一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
> window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
> 希望知道的大佬能给点建议。感谢!!!
>
> session_window = Session.with_gap("60.second").on("pv_time").alias("w")
> t_env.from_path('source') \
>.window(session_window) \
>.group_by("w,pv_id") \
>.select("pv_id,get_act(act)").insert_into("sink")
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png
> ;
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-03-07 Thread Xingbo Huang
Hi,

不好意思回复这么晚。关于pandas
udaf,我有专门测试过框架层的开销(函数用普通的均值计算)。和java相比,差距也就3,4倍左右,具体可以参考代码[1]。关于你这个代码,我怀疑是因为你函数实现的问题。你这个函数构造df是会有额外的开销。你为啥不直接使用j来进行计算。当然了,你也可以根据调整一些参数来提高性能,比如python.fn-execution.bundle.size和python.fn-execution.bundle.time,具体可以参考文档[2]。



[1]
https://github.com/HuangXingBo/pyflink-performance-demo/blob/master/python/flink/flink-pandas-udaf-test.py
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-fn-execution-bundle-size

Best,
Xingbo

xiao...@ysstech.com  于2021年3月2日周二 下午1:38写道:

> Hi,
> 是的,就是在batch模式下,我是只在本机local下执行的,不是集群模式,把全部代码贴一下吧。
> python版:
> # 建立环境(udaf仅支持批环境)
> env_settings =
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> env = BatchTableEnvironment.create(environment_settings=env_settings)
> # 表1 1千万行
>  source_ddl1 = """CREATE TABLE TP_GL_DAY (DAY_ID
> VARCHAR(8),IS_EXCH_DAY DECIMAL
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = 'root',
> 'password' = 'xxx',
> 'table-name' = 'TP_GL_DAY')
> """
> #表2 700多行
> source_ddl2 = """CREATE TABLE TS_PF_SEC_YLDRATE (PF_ID VARCHAR(10),\
> SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),\
> CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12)
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://ip:port/db?useSSL=False',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = 'root',
> 'password' = 'xxx',
> 'table-name' = 'TS_PF_SEC_YLDRATE')
> """
># sink
>  print_sink_ddl = """
>   CREATE TABLE print(
> pf_id VARCHAR(10),
> out_put FLOAT
> ) WITH (
>   'connector' = 'print'
> )
> """
> # 源表
> env.execute_sql(source_ddl1)
> env.execute_sql(source_ddl2)
># sink
>  env.execute_sql(print_sink_ddl)
>
> sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN
> TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID =
> '456' AND BIZ_DATE BETWEEN '20160701' AND '20170307'"
>
> # 获取Query结果
> query_table = env.sql_query(sql)
> # 执行udaf
> # udaf 聚合函数计算
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def logReturn(i, j):
> df = pd.DataFrame({'pf_id': i, 'yldrate': j})
> df['yldrate1'] = df['yldrate'] + 1
> return np.prod(df['yldrate1']) - 1
> # 执行并打印
> result =
> query_table.group_by(query_table.PF_ID).select(query_table.PF_ID,
> logReturn(
> query_table.PF_ID,
>
> query_table.YLDRATE)).execute_insert('print').wait()
>
> Java版本:
> Java选用的环境是流环境:
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings streamSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, streamSettings);
> streamEnv.execute("");
> 计算部分:
> java这边的queryData也是通过定义connector DDL注册源表后,执行sql获取的。
> tableEnv.registerFunction("add", new addFunction());
> tableEnv.registerFunction("prod", new ProductUdaf());
> Table addedTable = tableEnv.sqlQuery("SELECT pf_id,add(yldrate) as
> yldrate FROM queryData");
> tableEnv.createTemporaryView("addedTable", addedTable);
> Table resultTable = tableEnv.sqlQuery("SELECT
> pf_id,prod(yldrate)-1 as yldrate FROM addedTable group by pf_id");
>
> 因为java版本代码,是同事写的,但逻辑按照python这边的逻辑,执行时间上python看本机的cpu占用情况(每次执行时不超过8%)会跑400或500s不等,基本维持在400s左右;我的电脑是win10
> 64位,RAM16GB,主频2.3GHz, 内核4,逻辑处理器8.
>
>
> xiao...@ysstech.com
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 11:59
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
>
> 然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是

Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-03-01 Thread Xingbo Huang
Hi,

首先,我假定你是在batch模式上跑的Pandas UDAF(unbounded stream上不支持pandas udaf)。
然后,我再确认另一件事,你使用java写了一个java版本的udaf(logReturn),同你写的这个python版本的udaf进行对比,时间上java版本是3s?python版本的要8分钟?

Best,
Xingbo

xiao...@ysstech.com  于2021年3月2日周二 上午9:57写道:

> Hi,
> 我是用的flink1.12的pandas类型的udaf, 代码如下:
> @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def logReturn(i, j):
> df = pd.DataFrame({'id': i, 'rate': j})
> df['rate1'] = df['rate'] + 1
> return numpy.prod(df['rate1']) - 1
> 调用方式为:
>  result =
> query_table.group_by(query_table.PF_ID).select(query_table.ID,
>  logReturn(
>  query_table.ID,
>
>  query_table.RATE)).execute_insert('print').wait()
> 这个代码的sink用的print,,其实之前的to_pandas也是用在最后一步来获取计算结果,用于返回的;
> java的写法类似,也是定义了udaf之后执行,返回Table类型的结果,再对这个结构处理,取出计算结果;
> 但两者的执行时间差很多,python用了很多方式,实现同样的简单计算逻辑,都差不多要8分钟左右。
> 总感觉时间用在query上,之前使用过flink1.11中connector.read.query方式直接获取数据,计算的速度就很快~
> 小白一个,不太了解flink内部的设计,希望能在这里找到具体的原因~谢谢您啦~
>
>
>
> xiao...@ysstech.com
>
> 发件人: Xingbo Huang
> 发送时间: 2021-03-02 09:42
> 收件人: user-zh
> 主题: Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
> Hi,
>
> 你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
>
> udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。
>
> Best,
> Xingbo
>
> xiaoyue <18242988...@163.com> 于2021年3月1日周一 上午10:34写道:
>
> > Hi, Xingbo
> > 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> > 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> > 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
> >
> > 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or
> numpy中的矩阵计算,非常感谢~!
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-03-01 09:54:49,"Xingbo Huang"  写道:
> > >Hi,
> > >
> >
> >
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> > >
> > >Best
> > >Xingbo
> > >
> > >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
> > >
> > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> > >>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> > source1.ID
> > >> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> > >> '20170307'"
> > >> # 获取Query结果
> > >> query_table = env.sql_query(sql)
> > >> query_table.to_pandas()
> > >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> > >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> > >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> > >>
> > >>
> >
>


Re: Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-03-01 Thread Xingbo Huang
Hi,

你是拿java写的udaf和pandas udaf做性能对比的吗,你是怎么测试的?你是在哪种场景下使用的pandas
udaf?还有就是你用了to_pandas就是sink了,瓶颈就是在这,这玩意儿一般用在debug或者写it用的,不会拿来做性能测试的sink和上生产用的。

Best,
Xingbo

xiaoyue <18242988...@163.com> 于2021年3月1日周一 上午10:34写道:

> Hi, Xingbo
> 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
> 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
> 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。
>
> 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!
>
>
>
>
>
>
>
> 在 2021-03-01 09:54:49,"Xingbo Huang"  写道:
> >Hi,
> >
>
> >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
> >
> >Best
> >Xingbo
> >
> >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
> >
> >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
> >>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON
> source1.ID
> >> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> >> '20170307'"
> >> # 获取Query结果
> >> query_table = env.sql_query(sql)
> >> query_table.to_pandas()
> >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
> >>
> >>
>


Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-28 Thread Xingbo Huang
Hi,

差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。

Best
Xingbo

xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:

> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> '20170307'"
> # 获取Query结果
> query_table = env.sql_query(sql)
> query_table.to_pandas()
> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>
>


Re: pyflink1.12 to_pandas报错:java.lang.RuntimeException: Failed to fetch next result

2021-02-06 Thread Xingbo Huang
Hi,

你可以看到报错信息的有这么一行
Caused by: java.sql.SQLSyntaxErrorException: Unknown column 'FULLMV' in
'field list'
说你的表没有FULLMV这个字段导致的

Best,
Xingbo

肖越 <18242988...@163.com> 于2021年2月7日周日 上午10:43写道:

> 补充一下代码信息
> 下面是执行的语句:
> query_table = env.sql_query(sql)
> query_table.print_schema()
>
>
> @udf(result_type=DataTypes.FLOAT(), func_type="pandas")
> def udf_test(i):
> i = i.astype('float')
> return i
>
>
> result = query_table.select(query_table.PF_ID, query_table.SYMBOL_ID,
> udf_test(query_table.FULLMV))
> print(result.to_pandas())
> 报错信息:
> py4j.protocol.Py4JJavaError: An error occurred while calling o86.hasNext.
> : java.lang.RuntimeException: Failed to fetch next result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
> at
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
> at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
> at
> org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644)
> at
> org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Failed to fetch job execution result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
> ... 16 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
> ... 18 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
> at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
> ... 19 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
> at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
> 

Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?

2021-02-04 Thread Xingbo Huang
Hi,

你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions

Best,
Xingbo


瞿叶奇 <389243...@qq.com> 于2021年2月4日周四 下午5:53写道:

> 请问如何实现pyflink的py4j调用我自己写的java程序 ?


Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 Thread Xingbo Huang
Hi,

你其实可以在open方法里面进行加载的,这样只会加载一次,在eval方法中加载将会导致多次加载。

Best,
Xingbo

陈康 <844256...@qq.com> 于2021年2月4日周四 上午9:25写道:

> 感谢回复、之前是在__init__方法中加载Keras模型、经钉钉群大佬指教在eval中使用再加载、问题解决了,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

2021-02-02 Thread Xingbo Huang
Hi,

报错的原因是你函数逻辑实际上是一个aggregate function的语义, 不是scalar function的语义。
scalar function要求的是一进一出,输入输出的数量是保持一致的,pandas
udf只是利用了pandas的batch特性,把数据封装成了一个batch的series给你,但你实际上用还是得保持输入输出数量一致。比如你输入的是pd.Series([1,2,3]),你执行完+1操作之后,结果就是pd.Series([2,3,4]),两个series的长度是保持一致的,都是3。
而对于你这个函数,你实际上是把一整个pd.series的数据聚合成了一个结果,比如输入pd.Series([1,2,3]),你的返回结果就是6,这是多个进,一个出的语义。对于这种情况,你应该使用pandas
udaf。pandas udaf在release-1.12开始支持的,具体可以参考文档[1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Best,
Xingbo

肖越 <18242988...@163.com> 于2021年2月3日周三 上午11:50写道:

> # 定义计算逻辑函数
>
> @udf(input_types=DataTypes.DECIMAL(38,18,True),
> result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")
>
> def multi_production(yldrate):
>
> yldrate_1 = yldrate + 1
>
> return np.prod(yldrate_1) - 1
>
>
> 调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')
> 由于官网并未找到再详细的例子,pandas类型的udf 内部,可以遵循pandas风格处理数据么?
> 【报错信息】:
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:345)
> at
> org.apache.flink.python.AbstractPythonFunctionRunner.finishBundle(AbstractPythonFunctionRunner.java:230)
> ... 17 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 2: Traceback (most recent call last):
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 167, in _execute
> response = task()
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 223, in 
> lambda: self.create_worker().do_instruction(request), request)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 352, in do_instruction
> request.instruction_id)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
> line 386, in process_bundle
> bundle_processor.process_bundle(instruction_id))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
> line 812, in process_bundle
> data.transform_id].process_encoded(data.data)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
> line 205, in process_encoded
> self.output(decoded_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
> line 304, in output
> cython.cast(Receiver,
> self.receivers[output_index]).receive(windowed_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
> line 178, in receive
> self.consumer.process(windowed_value)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\operations.py",
> line 92, in process
> self._value_coder_impl.encode_to_stream(self.func(o.value),
> output_stream, True)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
> line 467, in encode_to_stream
> self._value_coder.encode_to_stream(value, out, nested)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
> line 438, in encode_to_stream
> pandas_to_arrow(self._schema, self._timezone, self._field_types, cols))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 35, in pandas_to_arrow
> schema.types[i]) for i in range(0, len(schema))]
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 35, in 
> schema.types[i]) for i in range(0, len(schema))]
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
> line 27, in create_array
> return pa.Array.from_pandas(s, mask=s.isnull(), type=t)
> AttributeError: 'decimal.Decimal' object has no attribute 'isnull'
>
>
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
> at
> 

Re: pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 Thread Xingbo Huang
Hi,
报错信息说了最少需要79m,我看你代码配成0m,当然还是继续报错呀
Best,
Xingbo

肖越 <18242988...@163.com> 于2021年2月3日周三 上午10:24写道:

> pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
> 结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
> 结果print报错:
> Traceback (most recent call last):
>   File "C:*/udtf_test.py", line 42, in 
> env.execute_sql('INSERT INTO print_result SELECT
> multi_production(YLDRATE) FROM query_result')
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
> line 543, in execute_sql
> return TableResult(self._j_tenv.executeSql(stmt))
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 154, in deco
> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: "The configured Task Off-Heap
> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
> The Task Off-Heap Memory can be configured using the configuration key
> 'taskmanager.memory.task.off-heap.size'."
>
>
> 【代码如下】:
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> # s_env.set_parallelism(8)
> env = StreamTableEnvironment.create(s_env,
>
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
> env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '0m')
> # 注册源表
> env.execute_sql(get_table_ddl('TP_GL_DAY'))
> env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))
>
>
> # 注册输出表
> out_ddl = '''
> CREATE TABLE print_result (
>  yldrate1 DOUBLE
> ) WITH (
>  'connector' = 'print'
> )
> '''
> env.execute_sql(out_ddl)
> # 定义及执行SQL
> log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY
> JOIN TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
> view_table = env.sql_query(log_query)
> env.register_table('query_result', view_table)
>
>
> # 定义计算逻辑函数
> @udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(),
> udf_type="pandas")
> def multi_production(yldrate):
> yldrate_1 = yldrate + 1
> return np.prod(yldrate_1) - 1
>
>
> # 注册函数
> env.register_function('multi_production', multi_production)
> env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE)
> FROM query_result')
> query_result.print_schema()
> env.execute('my_udf_job')
>
>


Re: python udf求助: Process died with exit code 0

2021-02-01 Thread Xingbo Huang
Hi,
IllegalStateException这个不是root
cause,最好把完整的日志贴出来才能更好查出问题。而且最好把能准确复现的代码尽量精简化的贴出来。

Best,
Xingbo

Appleyuchi  于2021年1月26日周二 下午5:51写道:

> 我进行了如下操作:
> https://yuchi.blog.csdn.net/article/details/112837327
>
>
> 然后报错:
> java.lang.IllegalStateException: Process died with exit code 0
>
>
> 请问应该如何解决?
> 谢谢您~!


Re: PyFlink Expected IPC message of type schema but got record batch

2021-02-01 Thread Xingbo Huang
Hi,

Sorry for the late reply. Thanks for reporting this issue which has been
recorded in FLINK-21208[1]. I will fix it as soon as possible.

[1] https://issues.apache.org/jira/browse/FLINK-21208

Best,
Xingbo

苗红宾  于2021年1月31日周日 下午3:28写道:

> Hi:
>
> Hope you are good! I have a question for pyflink, details as below:
>
> Feature: Windows of size 10 minutes that slides by 5 minutes for data
> aggregate, then do something, almost 2GB data per window, 1 million data
> items.
>
> Job params:
>
> bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \
> -Dyarn.containers.vcores=4 \
> -Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \
> -Dtaskmanager.memory.managed.fraction=0.7 \
> -Dtaskmanager.memory.task.off-heap.size=5120m \
> -nm $task_name -qu $queue -d
>
>
> Exception msg as below:
>
> Traceback (most recent call last):
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
> response = task()
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in 
> lambda: self.create_worker().do_instruction(request), request)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
> getattr(request, request_type), request.instruction_id)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
> bundle_processor.process_bundle(instruction_id))
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
> element.data)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
> self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 627, in decode_from_stream
> yield self._decode_one_batch_from_stream(in_stream,
> in_stream.read_var_int64())
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 638, in _decode_one_batch_from_stream
> return arrow_to_pandas(self._timezone, self._field_types,
> [next(self._batch_reader)])
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 631, in _load_from_stream
> reader = pa.ipc.open_stream(stream)
>   File
> 

Re: pyflink1.11 table.to_pandas() 报错 'Sort on a non-time-attribute field is not supported.'

2021-01-27 Thread Xingbo Huang
看报错你是调用了order_by操作吧。对于unbounded
table来说,order_by只能作用在时间字段上,要不然就得后面接上fetch操作。具体可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#orderby-offset--fetch

Best,
Xingbo

肖越 <18242988...@163.com> 于2021年1月27日周三 下午5:44写道:

> 通过sql_query执行返回table对象,执行table.to_pandas()报错:
> Traceback (most recent call last):
>   File
> "C:/projects/dataService-pyflink_explore/dataService-calculate-code-python/src/test/test_mysql_connector.py",
> line 161, in 
> print(table.to_pandas().head(6))
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
> line 723, in to_pandas
> .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 154, in deco
> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: 'Sort on a non-time-attribute
> field is not supported.'
> 请教各位大佬,为什么会转换失败?但表格本身print_schema是没问题的。


Re: python udf 提交到本地节点执行报错

2021-01-25 Thread Xingbo Huang
Hi,

看报错是你的客户端环境所使用的的`python`解释器没有安装pyflink。-pyexec指定的是你udf运行的worker所使用的python环境,但是你在客户端编译作业的时候也需要python环境,那个python环境也需要安装pyflink。

Best,
Xingbo


陈康 <844256...@qq.com> 于2021年1月25日周一 下午9:01写道:

> 你好、请教下配置pyflink、本地运行报错
> [root@hadoop01 ~]# pip list | grep flink
> apache-flink (1.12.0)
>
> [root@hadoop01 ~]# python3 -V
> Python 3.6.5
>
> flink run -m localhost:8081 -py datastream_tutorial.py -pyexec
> /usr/local/python3/bin/python3
>
>  File "datastream_tutorial.py", line 1, in 
> from pyflink.common.serialization import SimpleStringEncoder
> ModuleNotFoundError: No module named 'pyflink.common.serialization'
> ,请问下你是如何配置环境变量的吗?谢谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-19 Thread Xingbo Huang
Hi meneldor,

Yes. As the first version of Python DataStream, release-1.12 has not yet
covered all scenarios. In release-1.13, we will extend the function of
Python DataStream to cover most scenarios, and CoProcessFunction will
obviously be in it.

Best,
Xingbo

meneldor  于2021年1月19日周二 下午4:52写道:

> Thank you Xingbo!
>
> Do you plan to implement CoProcess functions too? Right now i cant find a
> convenient method to connect and merge two streams?
>
> Regards
>
> On Tue, Jan 19, 2021 at 4:16 AM Xingbo Huang  wrote:
>
>> Hi meneldor,
>>
>> 1. Yes. Although release 1.12.1 has not been officially released, it is
>> indeed available for download on PyPI.
>> In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`.
>>
>> 2. Whenever an element comes, your `process_element` method will be
>> invoked, so you can directly get the `value` parameter in
>> `process_element`. The firing of the `on_timer` method depends on your
>> registering timer, as you wrote in the example
>> `ctx.timer_service().register_event_time_timer(current_watermark + 1500)`.
>> You might need state access[1] which will be supported in release-1.13. At
>> that time, you can get your state in `on_timer`, so as to conveniently
>> control the output.
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>> .
>>
>> Best,
>> Xingbo
>>
>> meneldor  于2021年1月18日周一 下午10:44写道:
>>
>>> Thank you Xingbo
>>>
>>> 1. I will try to use normal list instead of named. Thanks!
>>> 2. There is a new 1.12.1 version of pyflink which is using
>>> process_element(self, value, ctx: 'KeyedProcessFunction.Context')
>>>
>>> And what about the on_timer(self, timestamp, ctx:
>>> 'KeyedProcessFunction.OnTimerContext')? Can i access the value as in
>>> process_element() in the ctx for example?
>>>
>>> Thank you!
>>>
>>> On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang  wrote:
>>>
>>>> Hi Shuiqiang, meneldor,
>>>>
>>>> 1. In fact, there is a problem with using Python `Named Row` as the
>>>> return value of user-defined function in PyFlink.
>>>>
>>>> When serializing a Row data, the serializer of each field is consistent
>>>> with the order of the Row fields. But the field order of Python `Named Row`
>>>> has been sorted by field, and it was designed to better compare Named Row
>>>> and calculate hash values. So this can lead to
>>>> serialization/deserialization errors(The correspondence between serializer
>>>> and field is wrong). It is for performance considerations that serializers
>>>> are not specified according to file name, but `Named Row` support can be
>>>> achieved at the expense of a little performance for ease of use. For the
>>>> current example, I suggest returning a list or a normal Row, instead of a
>>>> Named Row.
>>>>
>>>>
>>>> 2. In pyflink 1.12.0, the method signature of `on_timer` should be `def
>>>> process_element(self, value, ctx: 'KeyedProcessFunction.Context', out:
>>>> Collector)`[1].  If you want to send data in `on_timer`, you can use
>>>> `Collector.collect`. e.g.
>>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context',
>>>> out: Collector):
>>>> out.collect(Row('a', 'b', 'c'))
>>>>
>>>> 3. >>> I am not sure if the timestamp field should be included in
>>>> output_type_info as i did now.
>>>>
>>>> If you return data with a time_stamp field, `output_type_info` needs to
>>>> have `time_stamp` field. For example, the data returned in your example
>>>> contains `time_stamp`, so your `output_type_info` needs to have the
>>>> information of this field.
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> 2021年1月18日 下午9:21,meneldor  写道:
>>>>
>>>> Actually the *output_type_info* is ok, it was copy/paste typo. I
>>>> changed the function to:
>>>>
>>>> class MyProcessFunction(KeyedProcessFunction):
>>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>>>> yield types.Row(id=ctx.get_current_key()[0], 
>>>> tp=ctx.get_current_key()[1], ac

Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 Thread Xingbo Huang
Thanks Xintong for the great work!

Best,
Xingbo

Peter Huang  于2021年1月19日周二 下午12:51写道:

> Thanks for the great effort to make this happen. It paves us from using
> 1.12 soon.
>
> Best Regards
> Peter Huang
>
> On Mon, Jan 18, 2021 at 8:16 PM Yang Wang  wrote:
>
> > Thanks Xintong for the great work as our release manager!
> >
> >
> > Best,
> > Yang
> >
> > Xintong Song  于2021年1月19日周二 上午11:53写道:
> >
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.12.1, which is the first bugfix release for the Apache
> Flink
> >> 1.12 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
> >>
> >> The full release notes are available in Jira:
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >>
> >> Regards,
> >> Xintong
> >>
> >
>


Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 Thread Xingbo Huang
Thanks Xintong for the great work!

Best,
Xingbo

Peter Huang  于2021年1月19日周二 下午12:51写道:

> Thanks for the great effort to make this happen. It paves us from using
> 1.12 soon.
>
> Best Regards
> Peter Huang
>
> On Mon, Jan 18, 2021 at 8:16 PM Yang Wang  wrote:
>
> > Thanks Xintong for the great work as our release manager!
> >
> >
> > Best,
> > Yang
> >
> > Xintong Song  于2021年1月19日周二 上午11:53写道:
> >
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.12.1, which is the first bugfix release for the Apache
> Flink
> >> 1.12 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
> >>
> >> The full release notes are available in Jira:
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >>
> >> Regards,
> >> Xintong
> >>
> >
>


Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread Xingbo Huang
Hi meneldor,

1. Yes. Although release 1.12.1 has not been officially released, it is
indeed available for download on PyPI.
In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`.

2. Whenever an element comes, your `process_element` method will be
invoked, so you can directly get the `value` parameter in
`process_element`. The firing of the `on_timer` method depends on your
registering timer, as you wrote in the example
`ctx.timer_service().register_event_time_timer(current_watermark + 1500)`.
You might need state access[1] which will be supported in release-1.13. At
that time, you can get your state in `on_timer`, so as to conveniently
control the output.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
.

Best,
Xingbo

meneldor  于2021年1月18日周一 下午10:44写道:

> Thank you Xingbo
>
> 1. I will try to use normal list instead of named. Thanks!
> 2. There is a new 1.12.1 version of pyflink which is using process_element
> (self, value, ctx: 'KeyedProcessFunction.Context')
>
> And what about the on_timer(self, timestamp, ctx:
> 'KeyedProcessFunction.OnTimerContext')? Can i access the value as in
> process_element() in the ctx for example?
>
> Thank you!
>
> On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang  wrote:
>
>> Hi Shuiqiang, meneldor,
>>
>> 1. In fact, there is a problem with using Python `Named Row` as the
>> return value of user-defined function in PyFlink.
>>
>> When serializing a Row data, the serializer of each field is consistent
>> with the order of the Row fields. But the field order of Python `Named Row`
>> has been sorted by field, and it was designed to better compare Named Row
>> and calculate hash values. So this can lead to
>> serialization/deserialization errors(The correspondence between serializer
>> and field is wrong). It is for performance considerations that serializers
>> are not specified according to file name, but `Named Row` support can be
>> achieved at the expense of a little performance for ease of use. For the
>> current example, I suggest returning a list or a normal Row, instead of a
>> Named Row.
>>
>>
>> 2. In pyflink 1.12.0, the method signature of `on_timer` should be `def
>> process_element(self, value, ctx: 'KeyedProcessFunction.Context', out:
>> Collector)`[1].  If you want to send data in `on_timer`, you can use
>> `Collector.collect`. e.g.
>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context',
>> out: Collector):
>> out.collect(Row('a', 'b', 'c'))
>>
>> 3. >>> I am not sure if the timestamp field should be included in
>> output_type_info as i did now.
>>
>> If you return data with a time_stamp field, `output_type_info` needs to
>> have `time_stamp` field. For example, the data returned in your example
>> contains `time_stamp`, so your `output_type_info` needs to have the
>> information of this field.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759
>>
>> Best,
>> Xingbo
>>
>> 2021年1月18日 下午9:21,meneldor  写道:
>>
>> Actually the *output_type_info* is ok, it was copy/paste typo. I changed
>> the function to:
>>
>> class MyProcessFunction(KeyedProcessFunction):
>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>> yield types.Row(id=ctx.get_current_key()[0], 
>> tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], 
>> timestamp=ctx.timestamp())
>> ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)
>>
>> def on_timer(self, timestamp, ctx: 
>> 'KeyedProcessFunction.OnTimerContext'):
>> yield types.Row(id=ctx.get_current_key()[0], 
>> tp=ctx.get_current_key()[1], account="TEST", device_ts=11, 
>> timestamp=timestamp)
>>
>> And the type to:
>>
>> output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 
>> 'timestamp'],
>>  [Types.STRING(), Types.STRING(), 
>> Types.STRING(), Types.LONG(), Types.LONG()])
>>
>> I cant return the same data in *on_timer()* because there is no value
>> parameter. Thats why i hardcoded *device_ts*. However the exception
>> persists.
>>
>> I am not sure if the timestamp field should be included in 
>> *output_type_info* as i did now.
>>
>> Regards
>>
>>
>> On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen 
>> wrote:
>>
>>> Hi meneldor,
>>>
>>> Actually, the return type of the on_timer() must be the same as
>>> process_element(). It seems that the yield value of process_element() is
>>> missing the `timestamp` field.  And the `output_type_info` has four field
>>> names but with 5 field types. Could you align them?
>>>
>>> Best,
>>> Shuiqiang
>>>
>>
>>


Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread Xingbo Huang
Hi Shuiqiang, meneldor,

1. In fact, there is a problem with using Python `Named Row` as the return 
value of user-defined function in PyFlink.

When serializing a Row data, the serializer of each field is consistent 
with the order of the Row fields. But the field order of Python `Named Row` has 
been sorted by field, and it was designed to better compare Named Row and 
calculate hash values. So this can lead to serialization/deserialization 
errors(The correspondence between serializer and field is wrong). It is for 
performance considerations that serializers are not specified according to file 
name, but `Named Row` support can be achieved at the expense of a little 
performance for ease of use. For the current example, I suggest returning a 
list or a normal Row, instead of a Named Row.


2. In pyflink 1.12.0, the method signature of `on_timer` should be `def 
process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: 
Collector)`[1].  If you want to send data in `on_timer`, you can use 
`Collector.collect`. e.g.

def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
out: Collector):
out.collect(Row('a', 'b', 'c'))

3. >>> I am not sure if the timestamp field should be included in 
output_type_info as i did now.

If you return data with a time_stamp field, `output_type_info` needs to 
have `time_stamp` field. For example, the data returned in your example 
contains `time_stamp`, so your `output_type_info` needs to have the information 
of this field.


[1] 
https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759

Best,
Xingbo

> 2021年1月18日 下午9:21,meneldor  写道:
> 
> Actually the output_type_info is ok, it was copy/paste typo. I changed the 
> function to:
> class MyProcessFunction(KeyedProcessFunction):
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
> yield types.Row(id=ctx.get_current_key()[0], 
> tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], 
> timestamp=ctx.timestamp())
> ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)
> 
> def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
> yield types.Row(id=ctx.get_current_key()[0], 
> tp=ctx.get_current_key()[1], account="TEST", device_ts=11, 
> timestamp=timestamp)
> And the type to:
> output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 
> 'timestamp'],
>  [Types.STRING(), Types.STRING(), 
> Types.STRING(), Types.LONG(), Types.LONG()])
> I cant return the same data in on_timer() because there is no value 
> parameter. Thats why i hardcoded device_ts. However the exception persists. 
> I am not sure if the timestamp field should be included in output_type_info 
> as i did now.
> Regards
> 
> On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen  > wrote:
> Hi meneldor, 
> 
> Actually, the return type of the on_timer() must be the same as 
> process_element(). It seems that the yield value of process_element() is 
> missing the `timestamp` field.  And the `output_type_info` has four field 
> names but with 5 field types. Could you align them?
> 
> Best,
> Shuiqiang



Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Xingbo Huang
Hi meneldor,

I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example.
The signature of the `process_element` method has been changed in the new
version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out
your results.

[1] https://issues.apache.org/jira/browse/FLINK-20647

Best,
Xingbo

meneldor  于2021年1月15日周五 上午1:20写道:

> Thank you for the answer Shuiqiang!
> Im using the last apache-flink version:
>
>> Requirement already up-to-date: apache-flink in
>> ./venv/lib/python3.7/site-packages (1.12.0)
>
> however the method signature is using a collector:
>
> [image: image.png]
>  Im using the *setup-pyflink-virtual-env.sh* shell script from the
> docs(which uses pip).
>
> Regards
>
> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen 
> wrote:
>
>> Hi meneldor,
>>
>> The main cause of the error is that there is a bug in
>> `ctx.timer_service().current_watermark()`. At the beginning the stream,
>> when the first record come into the KeyedProcessFunction.process_element()
>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>>
>> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
>>
>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
>> automatically converted to a long interger in python but will cause Long
>> value overflow in Java when deserializing the registered timer value. I
>> will craete a issue to fix the bug.
>>
>> Let’s return to your initial question, at PyFlink you could create a Row
>> Type data as bellow:
>>
>> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=)
>>
>> And I wonder which release version of flink the code snippet you provided
>> based on? The latest API for KeyedProcessFunction.process_element() and
>> KeyedProcessFunction.on_timer() will not provid a `collector` to collect
>> output data but use `yield` which is a more pythonic approach.
>>
>> Please refer to the following code:
>>
>> def keyed_process_function_example():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.get_config().set_auto_watermark_interval(2000)
>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> data_stream = env.from_collection([(1, 'hello', '1603708211000'),
>>(2, 'hi', '1603708224000'),
>>(3, 'hello', '1603708226000'),
>>(4, 'hi', '1603708289000')],
>>   type_info=Types.ROW([Types.INT(), 
>> Types.STRING(), Types.STRING()]))
>>
>> class MyTimestampAssigner(TimestampAssigner):
>>
>> def extract_timestamp(self, value, record_timestamp) -> int:
>> return int(value[2])
>>
>> class MyProcessFunction(KeyedProcessFunction):
>>
>> def process_element(self, value, ctx: 
>> 'KeyedProcessFunction.Context'):
>> yield Row(id=ctx.get_current_key()[1], data='some_string', 
>> timestamp=)
>> # current_watermark = ctx.timer_service().current_watermark()
>> ctx.timer_service().register_event_time_timer(ctx.timestamp() + 
>> 1500)
>>
>> def on_timer(self, timestamp: int, ctx: 
>> 'KeyedProcessFunction.OnTimerContext'):
>> yield Row(id=ctx.get_current_key()[1], data='current on timer 
>> timestamp: ' + str(timestamp),
>>   timestamp=timestamp)
>>
>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], 
>> [Types.STRING(), Types.STRING(), Types.INT()])
>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>> .with_timestamp_assigner(MyTimestampAssigner())
>> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>> .key_by(lambda x: (x[0], x[1]), 
>> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
>> .process(MyProcessFunction(), output_type=output_type_info).print()
>> env.execute('test keyed process function')
>>
>>
>> Best,
>> Shuiqiang
>>
>>
>>
>>
>>
>> meneldor  于2021年1月14日周四 下午10:45写道:
>>
>>> Hello,
>>>
>>> What is the correct way to use Python dict's as ROW type in pyflink? Im
>>> trying this:
>>>
>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>>>  [Types.STRING(), Types.STRING(), 
>>> Types.LONG() ])
>>>
>>> class MyProcessFunction(KeyedProcessFunction):
>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
>>> out: Collector):
>>> result = {"id": ctx.get_current_key()[0], "data": "some_string", 
>>> "timestamp": }
>>> out.collect(result)
>>> current_watermark = ctx.timer_service().current_watermark()
>>> ctx.timer_service().register_event_time_timer(current_watermark + 
>>> 1500)
>>>
>>> def on_timer(self, timestamp, ctx: 
>>> 

Re: pyflink-udaf

2021-01-04 Thread Xingbo Huang
Hi,

我这边没有看到你提供的附件。关于这个报错,我在你上封邮件回复你了,你可以看下是不是你的weighted_avg
没有进行注册(可以通过create_temporary_system_function或者register_function来注册,这样就可以通过字符串的方式进行使用)。当然你要是直接使用DSL的方式(文档中的例子),是不用你注册的。

Best,
Xingbo

hepeitan  于2021年1月4日周一 下午8:48写道:

> 您好:
>   我们项目组计划使用pyflink的udaf对数据进行聚合处理,拜读社区文章得知,对于批处理方式或窗口式聚合,建议使用Vectorized
> Aggregate Functions
> 
> 。但此case提供的代码不完全,不是完整的case,
> 自己编写测试程序报错:"org.apache.flink.table.api.ValidationException: Undefined
> function: weighted_avg",附件为测试代码
>   请问能否提供一个完整的pyflink的udaf对数据进行聚合处理示例,多谢!!!
>
>
>
>


Re: pyflink-udaf

2021-01-04 Thread Xingbo Huang
Hi,

你好,事例已经提供了UDF的注册和使用,只剩下数据源的读取和输出没有提供(这有单独的部分来讲)。
关于你的报错,因为你没有提供具体咋使用的,只能猜测你没有按照示例使用DSL的方式,而是使用的字符串的方式,但却没有register函数导致报了这个错

Best,
Xingbo

消息室  于2021年1月4日周一 下午8:10写道:

> 您好:  
> 我们项目组计划使用pyflink的udaf对数据进行聚合处理,拜读社区文章得知,对于批处理方式或窗口式聚合,建议使用Vectorized
> Aggregate Functions。但此case提供的代码不完全,不是完整的case,
> 自己编写测试程序报错:"org.apache.flink.table.api.ValidationException: Undefined
> function: weighted_avg"
>请问能否提供一个完整的pyflink的udaf对数据进行聚合处理示例,多谢!!!
>
>
>
>
>   


Re: pyflink-1.12.0 stream api任务执行失败

2021-01-04 Thread Xingbo Huang
Hi,

看报错应该是你集群上使用的pyflink的版本是1.11的(那个报错No logging endpoint
provided.是1.11才有的)。你可以把版本升级到1.12试试

Best,
Xingbo

゛无邪 <17379152...@163.com> 于2021年1月4日周一 下午4:28写道:

> Hi,您好!
> 我们参考Flink官网上提供的Python API中的DataStream API用户指南文档编写了一份python脚本,文档地址:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/python/datastream-api-users-guide/operators.html
> flink运行方式是 on yarn,通过-py参数指定了脚本,能成功提交到yarn上,但是会遇到如下错误
> Job has been submitted with JobID ee9e3a89eae69f457b81d1ebf4a45264
> Traceback (most recent call last):
>   File "official_example_2blk.py", line 44, in 
> env.execute("tutorial_job")
>   File
> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
> line 623, in execute
>   File
> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: ee9e3a89eae69f457b81d1ebf4a45264)
> 完整的堆栈报错可以参考附件中,求助!!
>


Re: [ANNOUNCE] Apache Flink Stateful Functions 2.2.2 released

2021-01-03 Thread Xingbo Huang
@Gordon Thanks a lot for the release and for being the release manager.
And thanks to everyone who made this release possible!

Best,
Xingbo

Till Rohrmann  于2021年1月3日周日 下午8:31写道:

> Great to hear! Thanks a lot to everyone who helped make this release
> possible.
>
> Cheers,
> Till
>
> On Sat, Jan 2, 2021 at 3:37 AM Tzu-Li (Gordon) Tai 
> wrote:
>
> > The Apache Flink community released the second bugfix release of the
> > Stateful Functions (StateFun) 2.2 series, version 2.2.2.
> >
> > *We strongly recommend all users to upgrade to this version.*
> >
> > *Please check out the release announcement:*
> > https://flink.apache.org/news/2021/01/02/release-statefun-2.2.2.html
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Maven artifacts for Stateful Functions can be found at:
> > https://search.maven.org/search?q=g:org.apache.flink%20statefun
> >
> > Python SDK for Stateful Functions published to the PyPI index can be
> found
> > at:
> > https://pypi.org/project/apache-flink-statefun/
> >
> > Official Dockerfiles for building Stateful Functions Docker images can be
> > found at:
> > https://github.com/apache/flink-statefun-docker
> >
> > Alternatively, Ververica has volunteered to make Stateful Function's
> images
> > available for the community via their public Docker Hub registry:
> > https://hub.docker.com/r/ververica/flink-statefun
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349366
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Cheers,
> > Gordon
> >
>


Re: [ANNOUNCE] Apache Flink Stateful Functions 2.2.2 released

2021-01-03 Thread Xingbo Huang
@Gordon Thanks a lot for the release and for being the release manager.
And thanks to everyone who made this release possible!

Best,
Xingbo

Till Rohrmann  于2021年1月3日周日 下午8:31写道:

> Great to hear! Thanks a lot to everyone who helped make this release
> possible.
>
> Cheers,
> Till
>
> On Sat, Jan 2, 2021 at 3:37 AM Tzu-Li (Gordon) Tai 
> wrote:
>
> > The Apache Flink community released the second bugfix release of the
> > Stateful Functions (StateFun) 2.2 series, version 2.2.2.
> >
> > *We strongly recommend all users to upgrade to this version.*
> >
> > *Please check out the release announcement:*
> > https://flink.apache.org/news/2021/01/02/release-statefun-2.2.2.html
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Maven artifacts for Stateful Functions can be found at:
> > https://search.maven.org/search?q=g:org.apache.flink%20statefun
> >
> > Python SDK for Stateful Functions published to the PyPI index can be
> found
> > at:
> > https://pypi.org/project/apache-flink-statefun/
> >
> > Official Dockerfiles for building Stateful Functions Docker images can be
> > found at:
> > https://github.com/apache/flink-statefun-docker
> >
> > Alternatively, Ververica has volunteered to make Stateful Function's
> images
> > available for the community via their public Docker Hub registry:
> > https://hub.docker.com/r/ververica/flink-statefun
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349366
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Cheers,
> > Gordon
> >
>


Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

2021-01-03 Thread Xingbo Huang
Hi,
不好意思,这么晚才回复。因为你这个报错是发生在数据反序列的过程中,还没有到你写的函数体的具体内容。我看你pandas
udf的声明没有问题,那就得看下你的如何使用的了。我写了一个简化版本的,Array[String]作为输入,并且作为输出的,运行起来没有问题。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf

def extract_data():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

@udf(result_type=DataTypes.ARRAY(DataTypes.STRING()),
func_type="pandas")
def get_string_element(my_list):
return my_list

t = t_env.from_elements(
[("1", ["1", "2", "3"]), ("3", ["2", "3", "3"]), ("2", ["1", "4",
"3"])],
DataTypes.ROW(
[DataTypes.FIELD("Key", DataTypes.STRING()),
 DataTypes.FIELD("List_element",
DataTypes.ARRAY(DataTypes.STRING()))]))
print(t.select(get_string_element(t.List_element)).to_pandas())


if __name__ == '__main__':
extract_data()

你可以看下是不是类似的用法,或者你运行我这个demo是不是也会报错。

Best,
Xingbo


咿咿呀呀 <201782...@qq.com> 于2021年1月4日周一 上午9:38写道:

> 社区的各位大神,有没有碰到这个问题的,请教。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink 如何指定csv分隔符为“||”

2020-12-29 Thread Xingbo Huang
Hi,

csv.field-delimiter限制了只能使用单个字符,具体可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/csv.html#csv-field-delimiter

Best,
Xingbo

消息室  于2020年12月30日周三 下午12:04写道:

>
> format='csv'想要指定分割符为"||",设置为:
>
>
> 报错:
> Caused by: org.apache.flink.table.api.ValidationException: Option
> 'csv.field-delimiter' must be a string with single character, but was: \|\|
>
>
> 请问该如何指定分隔符为“||” ?
>
>
> -- 原始邮件 --
> *发件人:* "Dian Fu" ;
> *发送时间:* 2020年12月17日(星期四) 下午3:12
> *收件人:* "user-zh";
> *抄送:* "消息室";
> *主题:* Re: 求教:pyflink的sink是否支持redis connector?
>
> 嗯,需要打成Jar包,才可以在PyFlink里用:
> 1)需要创建一个fat jar,把依赖都shade到jar里面。现在默认的不是fat jar,需要修改一下pom文件,可以参考Kafka里的做法
> [1]。
> 2)关于如何使用,可用的属性有这些 [2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-sql-connector-kafka/pom.xml#L46
> [2]
> https://github.com/apache/bahir-flink/blob/f0b3e1e04930b79b277cfc7ebe3552db246578e9/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
>
>
> 在 2020年12月17日,上午11:55,magichuang  写道:
>
> hi,
>
> 想问一下您这个
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
> 可以打包成jar包嘛,然后在pyflink里用
>
> 对java不熟悉,我看这个页面里面只是对java和scala说了如何用
>
>
>
>
>
>
>
> Best,
>
> MagicHuang
>
>
>
>
> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2020-12-17 10:16:13
> 收 件 人:user-zh ,hepei...@qq.com
> 抄 送:
> 主 题:Re: 求教:pyflink的sink是否支持redis connector?
>
> 感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>
> redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
>
> 关于如何在PyFlink中使用connector,可以参考文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html
>
> 在 2020年12月17日,上午9:52,Xingbo Huang 写道:
>
> Hi,
>
> 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
> connector,关于如何自定义connector,你可以参考文档[2]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
>
> Best,
> Xingbo
>
>
> 消息室 于2020年12月17日周四 上午9:33写道:
>
> 您好:
>
>
> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
> connector?感谢!
>  如不支持,有何建议方式?
>
>
>
>
>
>
>
>


Re: PyFflink UDF Permission Denied

2020-12-28 Thread Xingbo Huang
Hi Andrew,

Sorry for the late reply. It seems that I misunderstood your description.
The script `pyflink-udf-runner.sh` you need to check exists in the `bin`
directory of the `pyflink` package installed by the python interpreter you
are using. You can execute the following command to find the path:

python -c "import pyflink;import
os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/bin')"

Regarding the question "The code runs fine if I do not include the UDF." is
because when using python udf, you need to use `pyflink-udf-runner.sh` to
create a python process that executes python udf code. If you don’t use
python udf, the contents of the execution are all running on the JVM.

Best,
Xingbo

Andrew Kramer  于2020年12月28日周一 下午8:29写道:

> Hi Xingbo,
>
> That file does not exist on the file system.
>
> Thanks,
> Andrew
>
> On Monday, 28 December 2020, Xingbo Huang  wrote:
>
>> Hi Andrew,
>>
>> According to the error, you can try to check the file permission of
>>
>> "/test/python-dist-78654584-bda6-4c76-8ef7-87b6fd256e4f/python-files/site-packages/site-packages/pyflink/bin/pyflink-udf-runner.sh"
>>
>> Normally, the permission of this script would be
>> -rwxr-xr-x
>>
>> Best,
>> Xingbo
>>
>> Andrew Kramer  于2020年12月27日周日 下午10:29写道:
>>
>>> Hi,
>>>
>>> I am using Flink in Zeppelin and trying to execute a UDF defined in
>>> Python.
>>>
>>> The problem is I keep getting the following permission denied error in
>>> the log:
>>> Caused by:
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>> java.io.IOException: Cannot run program
>>> "/test/python-dist-78654584-bda6-4c76-8ef7-87b6fd256e4f/python-files/site-packages/site-packages/pyflink/bin/pyflink-udf-runner.sh":
>>> error=13, Permission denied at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
>>> at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:447)
>>> at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:432)
>>> at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:299)
>>> at
>>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:417)
>>> ... 14 more
>>>
>>> The code runs fine if I do not include the UDF. I have modified the java
>>> properties to use /test instead of /tmp
>>>
>>> Any thoughts?
>>>
>>> Thanks,
>>> Andrew
>>>
>>


Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

2020-12-28 Thread Xingbo Huang
Hi,

我试了一下这个版本的pyarrow,跑了一下现有的test并没有能复现这个问题。你方便提供一下你的pandas udf的内容吗?

Best,
Xingbo

小学生 <201782...@qq.com> 于2020年12月28日周一 下午3:07写道:

> 您好,我使用的pyarrow是0.17.1。


Re: 邮件退订

2020-12-27 Thread Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

㊣ 俊 猫 ㊣ <877144...@qq.com> 于2020年12月27日周日 上午11:15写道:

> 您好,邮件退订一下


Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

2020-12-27 Thread Xingbo Huang
Hi,

你这个报错源自pyarrow反序列数据时的报错。你使用的pyarrow的版本能提供一下吗
pip list | grep pyarrow可以查看

Best,
Xingbo


小学生 <201782...@qq.com> 于2020年12月28日周一 上午10:37写道:

> 请教一下各位,使用pyflink中的向量化udf后,程序运行一段时间报错,查资料没有类似的问题,麻烦各位看看是咋回事
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 8: Traceback (most recent call last):
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
>   response = task()
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, inlambda: self.create_worker().do_instruction(request),
> request)
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
>   getattr(request, request_type), request.instruction_id)
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
>   bundle_processor.process_bundle(instruction_id))
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
>   element.data)
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
>   self.output(decoded_value)
>  File "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
>  File "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
>  File "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71,
> in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73,
> in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 625, in decode_from_stream
>   yield self._decode_one_batch_from_stream(in_stream,
> in_stream.read_var_int64())
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 636, in _decode_one_batch_from_stream
>   return arrow_to_pandas(self._timezone, self._field_types,
> [next(self._batch_reader)])
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 629, in _load_from_stream
>   reader = pa.ipc.open_stream(stream)
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py",
> line 146, in open_stream
>   return RecordBatchStreamReader(source)
>  File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py",
> line 62, in __init__
>   self._open(source)
>  File "pyarrow/ipc.pxi", line 360, in
> pyarrow.lib._RecordBatchStreamReader._open
>  File "pyarrow/error.pxi", line 123, in
> pyarrow.lib.pyarrow_internal_check_status
>  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch


Re: PyFflink UDF Permission Denied

2020-12-27 Thread Xingbo Huang
Hi Andrew,

According to the error, you can try to check the file permission of
"/test/python-dist-78654584-bda6-4c76-8ef7-87b6fd256e4f/python-files/site-packages/site-packages/pyflink/bin/pyflink-udf-runner.sh"

Normally, the permission of this script would be
-rwxr-xr-x

Best,
Xingbo

Andrew Kramer  于2020年12月27日周日 下午10:29写道:

> Hi,
>
> I am using Flink in Zeppelin and trying to execute a UDF defined in Python.
>
> The problem is I keep getting the following permission denied error in the
> log:
> Caused by:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program
> "/test/python-dist-78654584-bda6-4c76-8ef7-87b6fd256e4f/python-files/site-packages/site-packages/pyflink/bin/pyflink-udf-runner.sh":
> error=13, Permission denied at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:447)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:432)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:299)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:417)
> ... 14 more
>
> The code runs fine if I do not include the UDF. I have modified the java
> properties to use /test instead of /tmp
>
> Any thoughts?
>
> Thanks,
> Andrew
>


Re: Pyflink UDF with ARRAY as input

2020-12-17 Thread Xingbo Huang
Hi Torben,

It is indeed a bug, and I have created a JIRA[1]. The work around solution
is to use the index to solve (written in release-1.12):

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

@udf(result_type=DataTypes.STRING())
def get_string_element(my_list):
my_string = 'xxx'
for element in my_list:
if element[0] == 2:
my_string = element[1]
return my_string

t = t_env.from_elements(
[("1", [Row(3, "flink")]), ("3", [Row(2, "pyflink")]), ("2",
[Row(2, "python")])],
DataTypes.ROW(
[DataTypes.FIELD("Key", DataTypes.STRING()),
 DataTypes.FIELD("List_element",
 DataTypes.ARRAY(DataTypes.ROW(
 [DataTypes.FIELD("integer_element",
DataTypes.INT()),
  DataTypes.FIELD("string_element",
DataTypes.STRING())])))]))
print(t.select(get_string_element(t.List_element)).to_pandas())



[1] https://issues.apache.org/jira/browse/FLINK-20666

Best,
Xingbo

Barth, Torben  于2020年12月18日周五 上午2:46写道:

> Dear List,
>
>
>
> I have  a table with the following structure
>
>
>
> my_table
>
> -- Key: String
>
> -- List_element: ARRAY >>
>
>
>
> I want to define a udf to extract information of the “list_element”. I do
> not manage to access the information of the array in the udf. I try
> something like:
>
>
>
> @udf(result_type=DataTypes.STRING())
>
> def get_string_element(my_list):
>
>my_string = ‘xxx’
>
> for element in my_list:
>
> if element.integer_element  == 2:
>
> my_string = element. string_element
>
> return my_string
>
>
>
>
>
> table_env.create_temporary_function("get_string_element",
> get_string_element)
>
> *# use the function in Python Table API*
>
> my_table.select("get_string_element(List_element)")
>
>
>
> Unfortunately, I cannot get it work. Does anybody have an idea how the
> correct way to extract the information is?
>
>
>
> Any comments or ideas are very welcome.
>
>
>
> Thanks
>
> Torben
>
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>


Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 Thread Xingbo Huang
Hi,

据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
connector,关于如何自定义connector,你可以参考文档[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html

Best,
Xingbo


消息室  于2020年12月17日周四 上午9:33写道:

> 您好:
>
>   
> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
> connector?感谢!
>如不支持,有何建议方式?


Re: pyflink 定义udf 运行报错

2020-12-15 Thread Xingbo Huang
Hi,

因为你没有提供详细的作业信息,单看报错可以看到是使用的Python UDF抛出来的,更具体点是你Python
UDF返回的字符串结果在java端反序列的时候失败了,你可以检查一下你对应的Python UDF

Best,
Xingbo

Leopard  于2020年12月16日周三 上午9:42写道:

> pyflink 1.11.1
>
> Fail to run sql command: SELECT
> driverStatus,userId,latitude,locTime,longitude,city_code,ad_code
> ,geo_to_h3(latitude,longitude,7) as
> h3_hash,geo_to_numpy_int_h3(latitude,longitude,7) as h3_code
> FROM lbs_trace CROSS JOIN UNNEST(datas),lateral
> table(split_json(expandInfo)) as T(city_code,ad_code)
> java.io.IOException: Fail to run stream sql job
> at
>
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
> at
>
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
> at
>
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
> at
>
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494)
> at
>
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257)
> at
>
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
> at
>
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111)
> at
>
> org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
> at
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
> at
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:846)
> at
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:738)
> at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
> at
>
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
> at
>
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: aa71b252e058bf6b0f5ec15b23d86adc)
> at
>
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)
> at
>
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
> at
>
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161)
> ... 16 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: aa71b252e058bf6b0f5ec15b23d86adc)
> at
>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at
>
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
> at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
> at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
> at
>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
> at
>
> 

Re: pyflink 引用第三库的文件出现安装权限的问题

2020-12-15 Thread Xingbo Huang
Hi,

默认就是你每台机器的python指向的环境下,当然你也可以通过-pyexec指定不同的python环境

Best,
Xingbo

magichuang  于2020年12月15日周二 下午8:02写道:

> 我现在看看那个报错,flink是把requirements.txt  和  cached_dir  已经先上传到hdfs上了,因为
> /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/requirements.txt
>
>
>  
> /yarn/nm/usercache/root/appcache/application_1608026509770_0001/flink-dist-cache-f2899798-cfd4-42b9-95d3-204e7e34b943/418cd5a449fd6ec4b61a3cc2dca4ea08/cached_dir
> 在提交的时候  去看机器上是存在的,只不过等程序挂了,这个
> /yarn/nm/usercache/root/appcache/application_1608026509770_0001文件夹就没了,所以有感觉hdfs没有问题。。
>
> 现在想请教一下,flink在引入外部  python依赖时,在从离线包里面安装库的时候是安装到了哪里?
>
>
>
>
> 我看报错信息:  Error [Errno 13] Permission denied: '' while executing command
> python setup.py egg_info
>
> 因为它是在  python setup.py  的时候报的权限问题
>
>
>
>
> 求大家给看看~~感谢
>
>
>
>
> -- 原始邮件 --
>
> 发 件 人:magichuang 
>
> 发送时间:2020-12-15 14:15:04
>
> 收 件 人:user-zh 
>
> 抄 送:
>
> 主 题:pyflink 引用第三库的文件出现安装权限的问题
>
>
>
>
> 请教一下大家,在本地直接python demo.py是可以运行的,但是提交到集群就会报错
>
> flink 版本:1.11 flink on yarn集群模式部署, per-job模式提交,三台机器
>
>
>
>
> 提交命令:flink run -m yarn-cluster -ynm demo  -ys 2 -ytm 2048 -p 2 -py demo.py
>
>
>
>
> 代码截图地址:https://s3.ax1x.com/2020/12/15/rKIwE6.png
>
>
>
>
> 报错截图地址:https://s3.ax1x.com/2020/12/15/rKIlNT.png
>
>
>
>
> requestments.txt:IPy==1.0cache_dir:  IPy-1.00.tar.gz
>
>
>
>
> 自定义udf代码:
>
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>
> def judge_ip(ip):
>
> import IPy
>
> if ip in IPy.IP('192.168.112.0/28'):
>
> return 'in'
>
> return 'out'
>
>
>
>
>
>
>
> 祝好~
>
>
>


Re: Is working with states supported in pyflink1.12?

2020-12-15 Thread Xingbo Huang
Hi,

As Chesnay said, PyFlink has already supported Python DataStream stateless
APIs so that users are able to perform some basic data transformations, but
doesn't provide state access support yet in release-1.12. The proposal[1]
of enhancing the API with state access has been made and related
discussion[2] has been initiated on the dev mailing list. According to the
plan, this feature will be supported in release-1.13.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-153-Support-state-access-in-Python-DataStream-API-tt47127.html

Best,
Xingbo

Chesnay Schepler  于2020年12月16日周三 上午3:54写道:

> It is currently not possible to access state with the Python API.
>
> A proposal has recently been made to enhance the API with state access
> (under FLIP-152), but at this time I cannot provide a prediction for
> when it might be released.
>
> On 12/15/2020 7:55 PM, Nadia Mostafa wrote:
> > Hello,
> >
> > I'm new to flink and trying to build a stateful application using
> > python datastream API but can't find any example of how to use states
> > in python in flink 1.12 documentation.
> > Is states supported in the python datastream API?And if so, how can I
> > use it?
> >
> > Thanks in advance!
>
>
>


Re: pyflink 引用第三库的文件出现安装权限的问题

2020-12-14 Thread Xingbo Huang
Hi,

光看报错,不知道是哪一步的权限不足导致的,你要不尝试wheel包传上去试试看吧,tar包源码安装会执行setup.py,可能会读写yarn上某些没有权限的目录啥的。

Best,
Xingbo


Re: Re: pyflink是否可调试

2020-12-14 Thread Xingbo Huang
Hi,

客户端写的python代码会在一个客户端的进程通过py4j调用flink
java的代码去编译你的作业(这里有一个客户端的Python进程,只是用来编译代码生成pipeline),
然后实际运行时,非python代码部分(就是非各种udf的逻辑是运行在JVM里面的),python部分(各种udf)是运行在另一个Python进程里面的。
实际上,在下一个1.13版本我们有考虑在你本地运行调试的时候,将实际运行python代码的重连回客户端那个编译Python代码的进程,这样可以更利于你的本地调试,就不用开启remote
debug了。

Best,
Xingbo

guoliubi...@foxmail.com  于2020年12月15日周二 上午10:29写道:

> Hi Xingbo,
>
> 多谢指导,亲测有效。
> 源python文件运行一会儿本身就结束运行了,过阵子后才会跳到断点里。
> 所以源python文件只是做了个提交的动作,实际执行都是异步执行,是否可以这么理解?
> 如果是的话,之前已经运行过很多次源python文件,是否本地已经在后台异步运行了多次?是的话是否能监控到这些任务?
>
>
>
> guoliubi...@foxmail.com
>
> 发件人: Xingbo Huang
> 发送时间: 2020-12-15 09:59
> 收件人: user-zh
> 主题: Re: pyflink是否可调试
> Hi,
> 想要调试可以使用的方式为
> 1. 在PyCharm里创建一个Python Remote Debug
> run -> Python Remote Debug -> + -> 选择一个端口(比如6789)
>
> 2. 安装pydevd-pycharm(你PyCharm使用的python解释器)
> pip install pydevd-pycharm
> 其实上一步那个界面也有指导你安装了
>
> 3.  将以下代码插入到你要断点的udaf的代码前面(这段代码其实也是来自第一步创建remote debug里面)
> import pydevd_pycharm
> pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
> stderrToServer=True)
>
> 例如
> @udaf(result_type=DataTypes.INT(), func_type="pandas")
> def mean_udaf(v):
> import pydevd_pycharm
> pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
> stderrToServer=True)
> v.mean()
>
> 4. 在你的PyCharm里面点击Debug启动你刚刚创建的Remote Debug Server
>
> 5. 直接点击Run运行你的作业,这个时候会断点在你的udaf的代码里面了
>
> Best,
> Xingbo
>
> guoliubi...@foxmail.com  于2020年12月15日周二 上午9:25写道:
>
> >
> >
> 基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。
> > 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。
> > 请问是否需要什么配置才能进行调试。
> >
> >
> >
> > guoliubi...@foxmail.com
> >
>


Re: pyflink是否可调试

2020-12-14 Thread Xingbo Huang
Hi,
想要调试可以使用的方式为
1. 在PyCharm里创建一个Python Remote Debug
run -> Python Remote Debug -> + -> 选择一个端口(比如6789)

2. 安装pydevd-pycharm(你PyCharm使用的python解释器)
pip install pydevd-pycharm
其实上一步那个界面也有指导你安装了

3.  将以下代码插入到你要断点的udaf的代码前面(这段代码其实也是来自第一步创建remote debug里面)
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
stderrToServer=True)

例如
@udaf(result_type=DataTypes.INT(), func_type="pandas")
def mean_udaf(v):
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True,
stderrToServer=True)
v.mean()

4. 在你的PyCharm里面点击Debug启动你刚刚创建的Remote Debug Server

5. 直接点击Run运行你的作业,这个时候会断点在你的udaf的代码里面了

Best,
Xingbo

guoliubi...@foxmail.com  于2020年12月15日周二 上午9:25写道:

>
> 基于flink1.12.0版本的pyflink开发了一个程序,使用了udaf,想在本地的PyCharm环境调试该功能,在udaf的第一行打了断点,但是没法进入。
> 可以确认程序有正确运行,因为sink到kafka里看了是有数据的。
> 请问是否需要什么配置才能进行调试。
>
>
>
> guoliubi...@foxmail.com
>


Re: Re: Pandas UDF处理过的数据sink问题

2020-12-14 Thread Xingbo Huang
Hi,

join
udtf和你认为的两个table的join是不一样的,只是因为udtf会返回多条结果,需要左边的一条输入和多条的udtf输出拼接在一起,所以用join。对于udf只会返回一条输出,所以是一对一的拼接。如果你udtf只返回一条结果,这个拼接和udf就是类似的。udtf是能直接扩展列的,而udf,
udaf都没法直接扩展列的,除非你能使用row-based的那套operation[1],不过这个feature在1.13
PyFlink才会支持[2]。

当然了,你可以按照weizhong的方式,一个udaf,直接返回一个Row类型的数据,然后再去get(0),get(1)的方式去拿也可以,不过在1.12只有普通的Python
UDAF是支持返回一个Row类型的,Pandas
UDAF没法支持你返回一个Row类型的结果,不过这个feature在master(1.13)上已经支持了[3]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations
[2] https://issues.apache.org/jira/browse/FLINK-20479
[3] https://issues.apache.org/jira/browse/FLINK-20507

Best,
Xingbo


Best,
Xingbo

guoliubi...@foxmail.com  于2020年12月14日周一 下午2:29写道:

> Hi xingbo,
> 文档中给的例子udtf需要和join一起使用,但是我现在不需要join,只是单纯的转换结果
> 如果直接调用了udtf后sink,会提示
> Cause: Different number of columns.
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
>
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
>
>
> guoliubi...@foxmail.com
>
> 发件人: Xingbo Huang
> 发送时间: 2020-12-14 11:38
> 收件人: user-zh
> 主题: Re: Re: Pandas UDF处理过的数据sink问题
> Hi,
> 你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
> Best,
> Xingbo
>
> guoliubi...@foxmail.com  于2020年12月14日周一
> 上午11:00写道:
>
> > 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> > 但现在有另一个问题,根据文档
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> > Vectorized Python aggregate functions takes one or more pandas.Series as
> > the inputs and return one scalar value as output.
> > Note The return type does not support RowType and MapType for the time
> > being.
> > udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> > 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> > @udf(result_type=DataTypes.ROW(
> > [DataTypes.FIELD('value1', DataTypes.BIGINT()),
> >  DataTypes.FIELD('value2', DataTypes.INT())]))
> > def flattenStr(inputStr):
> > ret_array = [int(x) for x in inputStr.split(',')]
> > return Row(ret_array[0], ret_array[1])
> > t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table
> =
> > order_table.window(tumble_window) \
> > .group_by("w") \
> > .select("**调用udaf** as aggValue")
> > result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
> >
> >
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
> > at
> >
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
> > at
> >
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
> > at
> >
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
> > at
> >
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
> >
> >
> > guoliubi...@foxmail.com
> >
> > 发件人: Wei Zhong
> > 发送时间: 2020-12-14 10:38
> > 收件人: user-zh
> > 主题: Re: Pandas UDF处理过的数据sink问题
> > Hi Lucas,
> >
> > 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
> >
> > 你可以尝试将sql语句改成以下形式:
> >
> > select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> >
> > 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
> >
> > Best,
> > Wei
> >
> > > 在 2020年12月13日,13:13,Lucas  写道:
> > >
> > > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> > >
> > > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> > > result_type=DataTypes.ROW(
> > > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> > >  DataTypes.FIELD('aveBuy', DataTypes.INT())),
> > > func_type='pandas')
> > > def orderCalc(code, amount):
> > >
> > >df = pd.DataFrame({'code': code, 'amount': amount})
> > > # pandas 数据处理后输入另一个dataframe output
> > > return (output['buyQtl'], output['aveBuy'])
> > >
> > >
> &g

Re: Re: Pandas UDF处理过的数据sink问题

2020-12-13 Thread Xingbo Huang
Hi,
你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
Best,
Xingbo

guoliubi...@foxmail.com  于2020年12月14日周一 上午11:00写道:

> 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> 但现在有另一个问题,根据文档
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> Vectorized Python aggregate functions takes one or more pandas.Series as
> the inputs and return one scalar value as output.
> Note The return type does not support RowType and MapType for the time
> being.
> udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> @udf(result_type=DataTypes.ROW(
> [DataTypes.FIELD('value1', DataTypes.BIGINT()),
>  DataTypes.FIELD('value2', DataTypes.INT())]))
> def flattenStr(inputStr):
> ret_array = [int(x) for x in inputStr.split(',')]
> return Row(ret_array[0], ret_array[1])
> t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table =
> order_table.window(tumble_window) \
> .group_by("w") \
> .select("**调用udaf** as aggValue")
> result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
>
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
> at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
> at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
>
>
> guoliubi...@foxmail.com
>
> 发件人: Wei Zhong
> 发送时间: 2020-12-14 10:38
> 收件人: user-zh
> 主题: Re: Pandas UDF处理过的数据sink问题
> Hi Lucas,
>
> 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
>
> 你可以尝试将sql语句改成以下形式:
>
> select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
>
> 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
>
> Best,
> Wei
>
> > 在 2020年12月13日,13:13,Lucas  写道:
> >
> > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> > result_type=DataTypes.ROW(
> > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> >  DataTypes.FIELD('aveBuy', DataTypes.INT())),
> > func_type='pandas')
> > def orderCalc(code, amount):
> >
> >df = pd.DataFrame({'code': code, 'amount': amount})
> > # pandas 数据处理后输入另一个dataframe output
> > return (output['buyQtl'], output['aveBuy'])
> >
> >
> > 定义了csv的sink如下
> >
> > create table csvSink (
> >buyQtl BIGINT,
> >aveBuy INT
> > ) with (
> >'connector.type' = 'filesystem',
> >'format.type' = 'csv',
> >'connector.path' = 'e:/output'
> > )
> >
> >
> >
> > 然后进行如下的操作:
> >
> > result_table = t_env.sql_query("""
> > select orderCalc(code, amount)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> > """)
> > result_table.execute_insert("csvSink")
> >
> >
> >
> > 在执行程序的时候提示没法入库
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o98.executeInsert.
> >
> > : org.apache.flink.table.api.ValidationException: Column types of query
> > result and sink for registered table
> > 'default_catalog.default_database.csvSink' do not match.
> >
> > Cause: Different number of columns.
> >
> >
> >
> > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> >
> > Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> >
> >at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> > ception(DynamicSinkUtils.java:304)
> >
> >at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> > ImplicitCast(DynamicSinkUtils.java:134)
> >
> >
> >
> > 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> >
>
>
>


Re: A group window expects a time attribute for grouping in a stream environment.THANKS for your help

2020-12-09 Thread Xingbo Huang
Hi,

Your code does not show how to create the Table of `Orders`. For how to
specify the time attribute according to DDL, you can refer to the official
document[1].


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html

Best,
Xingbo

Appleyuchi  于2020年12月9日周三 下午12:42写道:

> my code is:
> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>
> it complains
>
> A group window expects a time attribute for grouping in a stream
> environment.
>
> but the data already as time attribute,
>
> How to fix it?
> Thanks for your help.
>
>
>
>
>


Re: how to register TableAggregateFunction?

2020-12-08 Thread Xingbo Huang
Hi,

As far as I know, TableAggregateFunction is not supported yet in batch
mode[1]. You can try to use it in stream mode.


[1] https://issues.apache.org/jira/browse/FLINK-10978

Best,
Xingbo

Leonard Xu  于2020年12月8日周二 下午6:05写道:

> Hi, appleyuchi
>
> Sorry for the late reply,
> but could you describe you problem more  or post your exception stack? The
> doc you posted has contained the section to define and register function.
>
> And I suggest you post your entire code in email directly that can
> reproduce the problem, thus the community
> can understand and help you resolve the question better.
>
> Best,
> Leonard
>
>
>
>
> I'm learning document
> 
>
> part *Flat Aggregate*
>
>
> My code is:
>
> https://paste.ubuntu.com/p/HmB4q2WJSb/
>
> Could you tell me how to register TableAggregateFunction
>
> Thanks for your help
>
>
>


Re: Python UDF filter problem

2020-12-08 Thread Xingbo Huang
Hi,

This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3
and release-1.12.0 have not been released yet (VOTE has been passed). I run
your job in release-1.12, and the plan is correct.


[1] https://issues.apache.org/jira/browse/FLINK-19675

Best,
Xingbo

László Ciople  于2020年12月8日周二 下午5:21写道:

> Hello,
> I am trying to use Flink v1.11.2 with Python and the Table API to read and
> write back messages to kafka topics. I am trying to filter messages based
> on the output of a udf which returns a boolean. It seems that Flink ignores
> the WHERE clause in my queries and every input message is received in the
> output topic.
> The input table is declared in SQL:
>
> --sql
> CREATE TABLE teams_event (
> `payload` ROW(
> `createdDateTime` STRING,
> `body` ROW(
> `content` STRING
> ),
> `from` ROW(
> `user` ROW(
> `displayName` STRING
> )
> ),
> `channelIdentity` ROW(
> `channelId` STRING
> )
> )
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'xdr.ms_teams2.events.messages',
> 'properties.bootstrap.servers' = 
> 'senso-kafka.solexdc01.bitdefender.biz:29030',
> 'properties.group.id' = 'teams_profanity_filter',
> 'format' = 'json',
> 'scan.startup.mode' = 'earliest-offset',
> 'json.fail-on-missing-field' = 'false',
> 'json.timestamp-format.standard' = 'ISO-8601'
> )
> """
>
> The output table is also declared in sql:
>
> --sql
> CREATE TABLE teams_profanity_event (
> `createdAt` STRING,
> `censoredMessage` STRING,
> `username` STRING,
> `channelId` STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'internal.alerts.teams.rude_employees2',
> 'properties.bootstrap.servers' = 
> 'senso-kafka.solexdc01.bitdefender.biz:29030',
> 'format' = 'json'
> )
>
> I have declared two udfs and registered them in the table environment
>
> @udf(input_types=[
> DataTypes.ROW([
> DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
> DataTypes.FIELD("body", DataTypes.ROW([
> DataTypes.FIELD("content", DataTypes.STRING())
> ])),
> DataTypes.FIELD("from", DataTypes.ROW([
> DataTypes.FIELD("user", DataTypes.ROW([
> DataTypes.FIELD("displayName", DataTypes.STRING())
> ]))
> ])),
> DataTypes.FIELD("channelIdentity", DataTypes.ROW([
> DataTypes.FIELD("channelId", DataTypes.STRING())
> ]))
> ])],
> result_type=DataTypes.BOOLEAN())
> def contains_profanity(payload):
> message_content = payload[1][0]
> found_profanity = profanity.contains_profanity(message_content)
> logger.info(f'Message "{message_content}" contains profanity: 
> {found_profanity}')
> return found_profanity
>
>
> @udf(input_types=[
> DataTypes.ROW([
> DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
> DataTypes.FIELD("body", DataTypes.ROW([
> DataTypes.FIELD("content", DataTypes.STRING())
> ])),
> DataTypes.FIELD("from", DataTypes.ROW([
> DataTypes.FIELD("user", DataTypes.ROW([
> DataTypes.FIELD("displayName", DataTypes.STRING())
> ]))
> ])),
> DataTypes.FIELD("channelIdentity", DataTypes.ROW([
> DataTypes.FIELD("channelId", DataTypes.STRING())
> ]))
> ])],
> result_type=DataTypes.STRING())
> def censor_profanity(payload):
> message_content = payload[1][0]
> censored_message = profanity.censor(message_content)
> logger.info(f'Censored message: "{censored_message}"')
> return censored_message
>
> The filtering of the messages and insertion into the sink is declared with
> SQL:
>
> --sql
> INSERT INTO teams_profanity_event (
> SELECT  `payload`.`createdDateTime`,
> censor_profanity(`payload`),
> `payload`.`from`.`user`.`displayName`,
> `payload`.`channelIdentity`.`channelId`
> FROM teams_event
> WHERE contains_profanity(`payload`)
> )
>
> Am I doing something wrong? It seems that the contains_profanity udf is
> not used in the pipeline:
> [image: image.png]
> Thank you in advance!
>


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Xingbo Huang
Hi Pierre,

The serialization/deserialization of sparse Row in flink is specially
optimized. The principle is that each Row will have a leading mask when
serializing to identify whether the field at the specified position is
NULL, and one field corresponds to one bit. For example, if you have 10k
fields, then there will be a mask of 10k bit / 8 = 1250 byte. In this way,
the serialization/deserialization overhead can be omitted for those field
values that are NULL.

For specific code optimization logic, you can refer to java logic[1], or
python logic[2] and cython logic[3].

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java#L185
[2]
https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py#L100
[3]
https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/coder_impl_fast.pyx#L697

Best,
Xingbo

Pierre Oberholzer  于2020年12月3日周四 下午3:08写道:

> Hi Xingbo, Community,
>
> Thanks a lot for your support.
> May I finally ask to conclude this thread, including wider audience:
> - Are serious performance issues to be expected with 100k fields per ROW
> (i.e. due solely to metadata overhead and independently of queries logic) ?
> - In sparse population (say 99% sparsity) already optimized in the ROW
> object or are sparse types on your roadmap ?
> Any experience with sparse Table from other users (including benchmarks
> vs. other frameworks) are also highly welcome.
>
> Thanks !
>
> Best
>
>
> Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> This example is written based on the syntax of release-1.12 that is about
>> to be released, and the test passed. In release-1.12, input_type can be
>> omitted and expression can be used directly. If you are using release-1.11,
>> you only need to modify the grammar of udf used slightly according to the
>> udf documentation[1].
>>
>> The flink table connector supports avro format, please refer to the
>> document[2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>>
>> Best,
>> Xingbo
>>
>> Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:
>>
>>> Hi Xingbo,
>>>
>>> Nice ! This looks a bit hacky, but shows that it can be done ;)
>>>
>>> I just got an exception preventing me running your code, apparently from
>>> udf.py:
>>>
>>> TypeError: Invalid input_type: input_type should be DataType but
>>> contains None
>>>
>>> Can you pls check again ?
>>> If the schema is defined is a .avsc file, do we have to parse it and
>>> rebuild those syntax (ddl and udf) and or is there an existing component
>>> that could be used ?
>>>
>>> Thanks a lot !
>>>
>>> Best,
>>>
>>>
>>> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :
>>>
>>>> Hi Pierre,
>>>>
>>>> I wrote a PyFlink implementation, you can see if it meets your needs:
>>>>
>>>>
>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>>>> DataTypes
>>>> from pyflink.table.udf import udf
>>>>
>>>>
>>>> def test():
>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>> env.set_parallelism(1)
>>>> t_env = StreamTableEnvironment.create(env,
>>>>
>>>> environment_settings=EnvironmentSettings.new_instance()
>>>>
>>>> .in_streaming_mode().use_blink_planner().build())
>>>>
>>>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>>>   '80m')
>>>>
>>>> # 10k nested columns
>>>> num_field = 10_000
>>>> fields = ['f%s INT' % i for i in range(num_field)]
>>>> field_str = ','.join(fields)
>>>> t_env.execute_sql(f"""
>>>> CREATE TABLE source_table (
>>>> f0 BIGINT,
>>>> f1 DECIMAL(32,2),
>>>> f2 ROW<${field_str}>,
>>>> f3 TIMESTAMP(3)
>>>> ) WITH (
>>>

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-02 Thread Xingbo Huang
Hi Pierre,

This example is written based on the syntax of release-1.12 that is about
to be released, and the test passed. In release-1.12, input_type can be
omitted and expression can be used directly. If you are using release-1.11,
you only need to modify the grammar of udf used slightly according to the
udf documentation[1].

The flink table connector supports avro format, please refer to the
document[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format

Best,
Xingbo

Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:

> Hi Xingbo,
>
> Nice ! This looks a bit hacky, but shows that it can be done ;)
>
> I just got an exception preventing me running your code, apparently from
> udf.py:
>
> TypeError: Invalid input_type: input_type should be DataType but contains
> None
>
> Can you pls check again ?
> If the schema is defined is a .avsc file, do we have to parse it and
> rebuild those syntax (ddl and udf) and or is there an existing component
> that could be used ?
>
> Thanks a lot !
>
> Best,
>
>
> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> I wrote a PyFlink implementation, you can see if it meets your needs:
>>
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>> DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> def test():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> t_env = StreamTableEnvironment.create(env,
>>
>> environment_settings=EnvironmentSettings.new_instance()
>>
>> .in_streaming_mode().use_blink_planner().build())
>>
>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>   '80m')
>>
>> # 10k nested columns
>> num_field = 10_000
>> fields = ['f%s INT' % i for i in range(num_field)]
>> field_str = ','.join(fields)
>> t_env.execute_sql(f"""
>> CREATE TABLE source_table (
>> f0 BIGINT,
>> f1 DECIMAL(32,2),
>> f2 ROW<${field_str}>,
>> f3 TIMESTAMP(3)
>> ) WITH (
>>   'connector' = 'datagen',
>>   'number-of-rows' = '2'
>> )
>> """)
>>
>> t_env.execute_sql(f"""
>> CREATE TABLE print_table (
>>  f0 BIGINT,
>>  f1 DECIMAL(32,2),
>>  f2 ROW<${field_str}>,
>>  f3 TIMESTAMP(3)
>> ) WITH (
>>  'connector' = 'print'
>> )
>> """)
>> result_type = DataTypes.ROW(
>> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
>> range(num_field)])
>>
>> func = udf(lambda x: x, result_type=result_type)
>>
>> source = t_env.from_path("source_table")
>>     result = source.select(source.f0, source.f1, func(source.f2),
>> source.f3)
>> result.execute_insert("print_table")
>>
>>
>> if __name__ == '__main__':
>> test()
>>
>>
>>  Best,
>>  Xingbo
>>
>> Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:
>>
>>> Hi Xingbo,
>>>
>>> That would mean giving up on using Flink (table) features on the content
>>> of the parsed JSON objects, so definitely a big loss. Let me know if I
>>> missed something.
>>>
>>> Thanks !
>>>
>>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a écrit :
>>>
>>>> Hi Pierre,
>>>>
>>>> Have you ever thought of declaring your entire json as a string field
>>>> in `Table` and putting the parsing work in UDF?
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:
>>>>
>>>>> Hi Xingbo,
>>>>>
>>>>> Many thanks for your follow up. Yes you got it right.
>>>>> So using Table API and a ROW object for the nested output of my UDF,
>>>>> and since types are mandatory, I guess this boils down to:
>>>>> - How to nicely specify the types for the 100k fields : shall I use
>>>>> TypeInformation [1] or better retrieve i

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-01 Thread Xingbo Huang
Hi Pierre,

I wrote a PyFlink implementation, you can see if it meets your needs:


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf


def test():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
  '80m')

# 10k nested columns
num_field = 10_000
fields = ['f%s INT' % i for i in range(num_field)]
field_str = ','.join(fields)
t_env.execute_sql(f"""
CREATE TABLE source_table (
f0 BIGINT,
f1 DECIMAL(32,2),
f2 ROW<${field_str}>,
f3 TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'number-of-rows' = '2'
)
""")

t_env.execute_sql(f"""
CREATE TABLE print_table (
 f0 BIGINT,
 f1 DECIMAL(32,2),
 f2 ROW<${field_str}>,
 f3 TIMESTAMP(3)
) WITH (
 'connector' = 'print'
)
""")
result_type = DataTypes.ROW(
[DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
range(num_field)])

func = udf(lambda x: x, result_type=result_type)

source = t_env.from_path("source_table")
result = source.select(source.f0, source.f1, func(source.f2), source.f3)
result.execute_insert("print_table")


if __name__ == '__main__':
test()


 Best,
 Xingbo

Pierre Oberholzer  于2020年12月1日周二 下午6:10写道:

> Hi Xingbo,
>
> That would mean giving up on using Flink (table) features on the content
> of the parsed JSON objects, so definitely a big loss. Let me know if I
> missed something.
>
> Thanks !
>
> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> Have you ever thought of declaring your entire json as a string field in
>> `Table` and putting the parsing work in UDF?
>>
>> Best,
>> Xingbo
>>
>> Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:
>>
>>> Hi Xingbo,
>>>
>>> Many thanks for your follow up. Yes you got it right.
>>> So using Table API and a ROW object for the nested output of my UDF, and
>>> since types are mandatory, I guess this boils down to:
>>> - How to nicely specify the types for the 100k fields : shall I use
>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>>> - Do I have to put NULL values for all the fields that don't have a
>>> value in my JSON ?
>>> - Will the resulting Table be "sparse" and suffer performance
>>> limitations ?
>>> Let me know if Table API and ROW are the right candidates here, or if
>>> other better alternatives exist.
>>> As said I'd be glad to apply some downstream transformations using
>>> key,value access (and possibly some Table <-> Pandas operations). Hope that
>>> doesn't make it a too long wish list ;)
>>>
>>> Thanks a lot !
>>>
>>> Best regards,
>>>
>>> [1]
>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
>>> [2]
>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>>>
>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a
>>> écrit :
>>>
>>>> Hi Pierre,
>>>>
>>>> Sorry for the late reply.
>>>> Your requirement is that your `Table` has a `field` in `Json` format
>>>> and its key has reached 100k, and then you want to use such a `field` as
>>>> the input/output of `udf`, right? As to whether there is a limit on the
>>>> number of nested key, I am not quite clear. Other contributors with
>>>> experience in this area may have answers. On the part of `Python UDF`, if
>>>> the type of key or value of your `Map` is `Any`, we do not support it now.
>>>> You need to specify a specific type. For more information, please refer to
>>>> the related document[1].
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> 2020年11月28日 上午12:49,Pierre Oberholzer  写道:
>>>>
>>>> Hello Wei, 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-30 Thread Xingbo Huang
Hi Pierre,

Have you ever thought of declaring your entire json as a string field in
`Table` and putting the parsing work in UDF?

Best,
Xingbo

Pierre Oberholzer  于2020年12月1日周二 上午4:13写道:

> Hi Xingbo,
>
> Many thanks for your follow up. Yes you got it right.
> So using Table API and a ROW object for the nested output of my UDF, and
> since types are mandatory, I guess this boils down to:
> - How to nicely specify the types for the 100k fields : shall I use
> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
> - Do I have to put NULL values for all the fields that don't have a value
> in my JSON ?
> - Will the resulting Table be "sparse" and suffer performance limitations ?
> Let me know if Table API and ROW are the right candidates here, or if
> other better alternatives exist.
> As said I'd be glad to apply some downstream transformations using
> key,value access (and possibly some Table <-> Pandas operations). Hope that
> doesn't make it a too long wish list ;)
>
> Thanks a lot !
>
> Best regards,
>
> [1]
> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
> [2]
> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>
> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang  a écrit :
>
>> Hi Pierre,
>>
>> Sorry for the late reply.
>> Your requirement is that your `Table` has a `field` in `Json` format and
>> its key has reached 100k, and then you want to use such a `field` as the
>> input/output of `udf`, right? As to whether there is a limit on the number
>> of nested key, I am not quite clear. Other contributors with experience in
>> this area may have answers. On the part of `Python UDF`, if the type of key
>> or value of your `Map` is `Any`, we do not support it now. You need to
>> specify a specific type. For more information, please refer to the related
>> document[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>
>> Best,
>> Xingbo
>>
>> 2020年11月28日 上午12:49,Pierre Oberholzer  写道:
>>
>> Hello Wei, Dian, Xingbo,
>>
>> Not really sure when it is appropriate to knock on the door of the
>> community ;)
>> I just wanted to mention that your feedback on the above topic will be
>> highly appreciated as it will condition the choice of framework on our side
>> for the months to come, and potentially help the community to cover sparse
>> data with Flink.
>>
>> Thanks a lot !
>>
>> Have a great week-end
>>
>> Best,
>>
>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>> pierre.oberhol...@gmail.com> a écrit :
>>
>>> Hi Wei,
>>>
>>> Thanks for the hint. May I please follow up by adding more context and
>>> ask for your guidance.
>>>
>>> In case the bespoken Map[String,Any] object returned by Scala:
>>>
>>> - Has a defined schema (incl. nested) with up to 100k (!) different
>>> possible keys
>>> - Has only some portion of the keys populated for each record
>>> - Is convertible to JSON
>>> - Has to undergo downstream processing in Flink and/or Python UDF with
>>> key value access
>>> - Has to be ultimately stored in a Kafka/AVRO sink
>>>
>>> How would you declare the types explicitly in such a case ?
>>>
>>> Thanks for your support !
>>>
>>> Pierre
>>>
>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  a
>>> écrit :
>>>
>>>> Hi Pierre,
>>>>
>>>> Currently there is no type hint like ‘Map[String, Any]’. The
>>>> recommended way is declaring your type more explicitly.
>>>>
>>>> If you insist on doing this, you can try to declaring a RAW data type
>>>> for java.util.HashMap [1], but you may encounter some troubles [2] related
>>>> to the kryo serializers.
>>>>
>>>> Best,
>>>> Wei
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>>>> [2]
>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>>>>
>>>>
>>>> 在 2020年11月19日,04:31,Pierre Oberholzer  写道:
>>>>
>>>> Hi Wei,
>>>>
>>>> It works ! Thanks a lot for your support.
>>>> I hadn't tried this last combination for option 1, and I had wrong
>>>> syntax for o

Re: 退订

2020-11-30 Thread Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingb

elvis  于2020年12月1日周二 上午9:42写道:

> 退订


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-27 Thread Xingbo Huang
Hi Pierre,

Sorry for the late reply.
Your requirement is that your `Table` has a `field` in `Json` format and its 
key has reached 100k, and then you want to use such a `field` as the 
input/output of `udf`, right? As to whether there is a limit on the number of 
nested key, I am not quite clear. Other contributors with experience in this 
area may have answers. On the part of `Python UDF`, if the type of key or value 
of your `Map` is `Any`, we do not support it now. You need to specify a 
specific type. For more information, please refer to the related document[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html

Best,
Xingbo

> 2020年11月28日 上午12:49,Pierre Oberholzer  写道:
> 
> Hello Wei, Dian, Xingbo,
> 
> Not really sure when it is appropriate to knock on the door of the community 
> ;)
> I just wanted to mention that your feedback on the above topic will be highly 
> appreciated as it will condition the choice of framework on our side for the 
> months to come, and potentially help the community to cover sparse data with 
> Flink.
> 
> Thanks a lot !
> 
> Have a great week-end
> 
> Best,
> 
> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer  > a écrit :
> Hi Wei,
> 
> Thanks for the hint. May I please follow up by adding more context and ask 
> for your guidance.
> 
> In case the bespoken Map[String,Any] object returned by Scala:
> 
> - Has a defined schema (incl. nested) with up to 100k (!) different possible 
> keys
> - Has only some portion of the keys populated for each record
> - Is convertible to JSON
> - Has to undergo downstream processing in Flink and/or Python UDF with key 
> value access
> - Has to be ultimately stored in a Kafka/AVRO sink
> 
> How would you declare the types explicitly in such a case ?
> 
> Thanks for your support !
> 
> Pierre
> 
> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  > a écrit :
> Hi Pierre,
> 
> Currently there is no type hint like ‘Map[String, Any]’. The recommended way 
> is declaring your type more explicitly.
> 
> If you insist on doing this, you can try to declaring a RAW data type for 
> java.util.HashMap [1], but you may encounter some troubles [2] related to the 
> kryo serializers.
> 
> Best,
> Wei
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>  
> 
> [2] 
> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>  
> 
> 
> 
>> 在 2020年11月19日,04:31,Pierre Oberholzer > > 写道:
>> 
>> Hi Wei,
>> 
>> It works ! Thanks a lot for your support.
>> I hadn't tried this last combination for option 1, and I had wrong syntax 
>> for option 2.
>> 
>> So to summarize..
>> 
>> Methods working:
>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>> - Outdated: override getResultType in UDF definition + 
>> t_env.register_java_function for UDF registering
>> 
>> Type conversions working:
>> - scala.collection.immutable.Map[String,String] => 
>> org.apache.flink.types.Row => ROW
>> - scala.collection.immutable.Map[String,String] => 
>> java.util.Map[String,String] => MAP
>> 
>> Any hint for Map[String,Any] ?
>> 
>> Best regards,
>> 
>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong > > a écrit :
>> Hi Pierre,
>> 
>> Those 2 approaches all work in my local machine, this is my code:
>> 
>> Scala UDF:
>> package com.dummy
>> 
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.table.annotation.DataTypeHint
>> import org.apache.flink.table.api.Types
>> import org.apache.flink.table.functions.ScalarFunction
>> import org.apache.flink.types.Row
>> 
>> /**
>>   * The scala UDF.
>>   */
>> class dummyMap extends ScalarFunction {
>> 
>>   // If the udf would be registered by the SQL statement, you need add this 
>> typehint
>>   @DataTypeHint("ROW")
>>   def eval(): Row = {
>> 
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> 
>>   }
>> 
>>   // If the udf would be registered by the method 'register_java_function', 
>> you need override this
>>   // method.
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = {
>> // The type of the return values should be TypeInformation
>> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>> Types.STRING()))
>>   }
>> }
>> Python code:
>> 
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment
>> 
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> st_env = StreamTableEnvironment.create(s_env)
>> 
>> # load the scala udf jar file, the path should be 

  1   2   3   >