Re: DataStream Join produces no output and causes program crash

2023-02-16 Thread Shuiqiang Chen
Hi Reme, The code you provided seems good to me. Maybe you can add some logs in the getKey() and join() function for debug purpose to observe whether there was any successfully joined record. By the way, the metrics in WebUI dashboard might be of good help. Best, Shuiqiang Reme Ajayi

Re: Weird Flink SQL error

2022-11-23 Thread Shuiqiang Chen
Hi Dan, Which Flink version do you apply? I write a test case base on the code snippet you provided and it works normally in Flink 1.17-SNAPSHOT. Best, Shuiqiang Dan Hill 于2022年11月23日周三 13:55写道: > Hi. I'm hitting an obfuscated Flink SQL parser error. Is there a way to > get better errors

Re: FW: FW: Issue Facing While Using EmbeddedRocksDbCheckpointing FlinkVersion(1.15.0)

2022-06-13 Thread Shuiqiang Chen
; *Cc:* user > *Subject:* Re: FW: Issue Facing While Using EmbeddedRocksDbCheckpointing > FlinkVersion(1.15.0) > > > > None of the attachments are logs of the TaskManger. The TaskManger log > should be located in the directory > `E:\pythonProject16\lib\site-packages\pyflink\log`. >

Re: Issue Facing While Using EmbeddedRocksDbCheckpointing FlinkVersion(1.15.0)

2022-06-03 Thread Shuiqiang Chen
Hi, I guess that the traceback log you provided might not be the root cause of the failure, could you please provide the complete log of the Taskmanager? Best, Shuiqiang harshit.varsh...@iktara.ai 于2022年6月2日周四 22:04写道: > Dear Team, > > > > I am new to pyflink and request for your support in

Re: PyFlink import internal packages

2021-12-03 Thread Shuiqiang Chen
Hi, Actually, you are able to develop your app in the clean python way. It's fine to split the code into multiple files and there is no need to call `env.add_python_file()` explicitly. When submitting the PyFlink job you can specify python files and entry main module with option --pyFiles and

Re: Fetch data from Rest API and sink to Kafka topic

2021-11-08 Thread Shuiqiang Chen
Hi Sharma, >From your description, it seem that you need to implement a custom source to fetch data from an Http server. Please refer to data sources [1] to learn how to develop a data source. And FYI, there is a

Re: 提交flink作业抛 java.lang.LinkageError

2021-11-07 Thread Shuiqiang Chen
Hi, 能检查下作业jar里 kafka client的版本和平台上的是否一致吗? casel.chen 于2021年11月5日周五 下午11:25写道: > 我在公司实时计算平台上提交了一个streaming api写的作业,结果抛如下异常。因为我们的实时计算平台是以flink > sql为主的,上面已经集成了flink-kafka-connector。而我提交的作业也是需要从kafka消费,所以将相同版本的flink kafka > connector也打进了作业jar包内。请问是什么原因造成的,需要如何修复?谢谢! > > > 2021-11-05 16:38:58 -

Re: Application mode - Custom Flink docker image with Python user code

2021-10-26 Thread Shuiqiang Chen
Hi Sumeet, Actually, running pyflink jobs in application mode on kubernetes has been supported since release 1.13. To build a docker image with PyFlink installed, please refer to Enabling Python[1]. In order to run the python code in application mode, you also need to COPY the code files into

Re: flink-yarn的pre-job模式

2021-10-26 Thread Shuiqiang Chen
你好, 上传的图片无法加载。 这种情况是 yarn 无法提供拉起taskmanager,检查下yarn资源是否充足? 王健 <13166339...@163.com> 于2021年10月26日周二 下午7:50写道: > 您好: > 我部署flink yarn的pre-job模式运行报错,麻烦看看是啥原因,非常感谢。 > > 1.运行命令:/usr/local/flink-1.13.2/bin/flink run -t yarn-per-job -c > com.worktrans.flink.wj.ods.FlinkCDC01

Re: [EXTERNAL] Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-25 Thread Shuiqiang Chen
s contribution as you like. Best, Shuiqiang Bohinski, Kevin 于2021年3月25日周四 下午12:00写道: > Hi Shuiqiang, > > > > Thanks for letting me know. Feel free to send any beginner level > contributions for this effort my way  . > > > > Best, > > kevin > > > > *From: *

Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
to implement the > connector over the weekend? > > I am interested in contributing to Flink, and I think this can be a good > starting point to me > > Best > Bin > > On Wed, Mar 24, 2021 at 7:49 PM Shuiqiang Chen > wrote: > >> I have just created the jira &

Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
; Bin > > On Wed, Mar 24, 2021 at 7:30 PM Shuiqiang Chen > wrote: > >> Hi Kevin, >> >> Kinesis connector is not supported yet in Python DataStream API. We will >> add it in the future. >> >> Best, >> Shuiqiang >> >> Bohinski,

Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
Hi Kevin, Kinesis connector is not supported yet in Python DataStream API. We will add it in the future. Best, Shuiqiang Bohinski, Kevin 于2021年3月25日周四 上午5:03写道: > Is there a kinesis example? > > > > *From: *"Bohinski, Kevin" > *Date: *Wednesday, March 24, 2021 at 4:40 PM > *To: *"Bohinski,

Re: Pyflink tutorial output

2021-03-23 Thread Shuiqiang Chen
Hi Robert, Have you tried exploring the /tmp/output directory in the task manager pods on you kubernetes cluster? The StreamingFileSink will create the output directory on the host of task manager in which the sink tasks are executed. Best, Shuiqiang Robert Cullen 于2021年3月24日周三 上午2:48写道: >

Re: Working with DataStreams of Java objects in Pyflink?

2021-03-15 Thread Shuiqiang Chen
Hi Kevin, Currently, POJO type is not supported in Python DataStream API because it is hard to deal with the conversion between Python Objects and Java Objects. Maybe you can use a RowType to represent the POJO class such as Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(),

Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-14 Thread Shuiqiang Chen
ructor of the custom sink/source. > > What's the best way to pass arguments to the constructor? > > On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam wrote: > >> Thanks Shuiqiang! That's really helpful, we'll give the connectors a try. >> >> On Wed, Mar 3, 2021 at 4:02 AM Sh

Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
nks. > > On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen > wrote: > >> Hi Robert, >> >> Kafka Connector is provided in Python DataStream API since >> release-1.12.0. And the documentation for it is lacking, we will make it up >> soon. >> >

Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert, Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon. The following code shows how to apply KafkaConsumers and KafkaProducer: ``` env = StreamExecutionEnvironment.get_execution_environment()

Re: Pyflink 提交到本地集群报错

2021-03-09 Thread Shuiqiang Chen
Huilin 你好, 你用的是哪个版本的Flink呢? Huilin_WU <592900...@qq.com> 于2021年3月10日周三 上午9:39写道: > 我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py > xx.py就会报上面的错误说没有pyflink的组件。 > (base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m > localhost:8081 -py demo_predict.py >

Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-06 Thread Shuiqiang Chen
tl create -f flink-configuration-configmap.yaml$ kubectl create -f jobmanager-service.yaml# Create the deployments for the cluster$ kubectl create -f job-manager.yaml$ kubectl create -f task-manager.yaml Best, Shuiqiang Shuiqiang Chen 于2021年3月6日周六 下午5:10写道: > Hi Kevin, > > You are a

Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-06 Thread Shuiqiang Chen
Hi Kevin, You are able to run PyFlink applications on kuberetes cluster, both native k8s mode and resource definition mode are supported since release-1.12.0. Currently, Python and PyFlink are not enabled in official flink docker image, that you might need to build a custom image with Python and

Re: Convert BIGINT to TIMESTAMP in pyflink when using datastream api

2021-03-04 Thread Shuiqiang Chen
Hi Shilpa, There might be something wrong when defining the rowtime field with the Connector descriptor, it’s recommended to use SQL DDL to create tables, and do queries with table API. Best, Shuiqiang Shilpa Shankar 于2021年3月4日周四 下午9:29写道: > Hello, > > We are using pyflink's datastream api

Re: PyFlink Connection Refused to Kubernetes Session Cluster

2021-03-04 Thread Shuiqiang Chen
Hi Robert, It seems the retrieved address of JobManager is a cluster-internal Ip that can noly be accessed inside the cluster. As you said, you might need to create an ingress to expose the JobManager service so that the client can access to it outside of the k8s cluster. Best, Shuiqiang Robert

Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-03 Thread Shuiqiang Chen
Hi Kevin, Thank you for your questions. Currently, users are not able to defined custom source/sinks in Python. This is a greate feature that can unify the end to end PyFlink application development in Python and is a large topic that we have no plan to support at present. As you have noticed

Re: 问题求助(Pyflink)

2021-01-30 Thread Shuiqiang Chen
抱歉,漏了文档链接 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#enabling-kerberos-authentication Shuiqiang Chen 于2021年1月30日周六 下午4:32写道: > Hi, > 按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和 > security.kerberos.login.principal这两个属性了

Re: 问题求助(Pyflink)

2021-01-30 Thread Shuiqiang Chen
Hi, 按照这个文档, 在flink-conf.yaml里配置了security.kerberos.login.keytab 和 security.kerberos.login.principal这两个属性了吗? 还有就是jaas.conf文件在各个task manager所在的机器都能访问到吗? 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午4:15写道: > 老师,您好, > 1)报错信息在附件内,flink-root-python-node-master1aSdM.log文件。报错信息如下: > Caused by:

Re: 问题求助(Pyflink)

2021-01-29 Thread Shuiqiang Chen
你好, 可以看下source task所在task manager 的日志里看看 consumer 有没有成功获取到kafka partition相关meta信息和认证相关是否成功的信息。 瞿叶奇 <389243...@qq.com> 于2021年1月30日周六 下午3:14写道: > 老师,你好,消费是没有任何问题,可以正常消费。 > > > > > -- 原始邮件 -- > *发件人:* "user-zh" ; > *发送时间:* 2021年1月30日(星期六) 下午3:08 > *收件人:* "user-zh"; >

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread Shuiqiang Chen
Hi meneldor, Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field. And the `output_type_info` has four field names but with 5 field types. Could you align them? Best, Shuiqiang

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-17 Thread Shuiqiang Chen
>> [1] https://issues.apache.org/jira/browse/FLINK-20647 >> >> Best, >> Xingbo >> >> meneldor 于2021年1月15日周五 上午1:20写道: >> >>> Thank you for the answer Shuiqiang! >>> Im using the last apache-flink version: >>> >>>>

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Shuiqiang Chen
Hi meneldor, The main cause of the error is that there is a bug in `ctx.timer_service().current_watermark()`. At the beginning the stream, when the first record come into the KeyedProcessFunction.process_element() , the current_watermark will be the Long.MIN_VALUE at Java side, while at the

Re: Flink[Python] questions

2021-01-14 Thread Shuiqiang Chen
Hi Dc, Thank you for your feedback. 1. Currently, only built-in types are supported in Python DataStream API, however, you can apply a Row type to represent a custom Python class as a workaround that field names stand for the name of member variables and field types stand for the type of member

Re: FlinkKafkaConsumer问题

2020-09-03 Thread Shuiqiang Chen
Hi, 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。

Re: FlinkKafkaConsumer问题

2020-09-03 Thread Shuiqiang Chen
Hi op, 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-02 Thread Shuiqiang Chen
Hi jincheng, Thanks for the discussion. +1 for the FLIP. A well-organized documentation will greatly improve the efficiency and experience for developers. Best, Shuiqiang Hequn Cheng 于2020年8月1日周六 上午8:42写道: > Hi Jincheng, > > Thanks a lot for raising the discussion. +1 for the FLIP. > > I

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-02 Thread Shuiqiang Chen
Hi jincheng, Thanks for the discussion. +1 for the FLIP. A well-organized documentation will greatly improve the efficiency and experience for developers. Best, Shuiqiang Hequn Cheng 于2020年8月1日周六 上午8:42写道: > Hi Jincheng, > > Thanks a lot for raising the discussion. +1 for the FLIP. > > I

Re: Job Cluster on Kubernetes with PyFlink

2020-07-28 Thread Shuiqiang Chen
sname" to be "org.apache.flink.client.python.PythonDriver". 5. Adding '-pym {the_entry_module_of_your_pyflink_job}' to [job arguments]. Best, Shuiqiang Shuiqiang Chen 于2020年7月28日周二 下午5:55写道: > Hi Wojciech, > > Currently, we are not able to deploy a job cluster for PyFlink jobs on > kube

Re: Job Cluster on Kubernetes with PyFlink

2020-07-28 Thread Shuiqiang Chen
Hi Wojciech, Currently, we are not able to deploy a job cluster for PyFlink jobs on kubernetes, but it will be supported in release-1.12. Best, Shuiqiang

Re: flink1.11启动问题

2020-07-21 Thread Shuiqiang Chen
Hi, 可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源 Best, Shuiqiang 酷酷的浑蛋 于2020年7月21日周二 下午4:37写道: > > > 服了啊,这个flink1.11启动怎么净是问题啊 > > > 我1.7,1.8,1.9 都没有问题,到11就不行 > ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm > 1024 -ynm sql_test ./examples/batch/WordCount.jar

Re: pyflink1.11.0window

2020-07-20 Thread Shuiqiang Chen
看看异常信息, 是不是你的insert mode没配置对。 BTW, 你粘贴的文本带有很多"", 有点影响可读性。 Best, Shuiqiang 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月20日周一 下午4:23写道: > HI : > 我现在有一个新的问题,我在此基础上加了一个关联,再写入kafka时报错,如下 > Traceback (most recent call last): > File > "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",

Re: pyflink1.11.0window

2020-07-15 Thread Shuiqiang Chen
下面这个例子从kafka读取json格式的数据, 然后做窗口聚合后写入es, 可以参考下代码结构, 修改相应数据字段。 这份代码你本地应该是不能运行的 from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings from pyflink.table.udf import udf

Re: pyflink1.11.0window

2020-07-14 Thread Shuiqiang Chen
举个sql例子 select platformcodetoname(payPlatform) as platform, sum(payAmount) as pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as rowtime from payment_msg group by tumble(rt, interval '5' seconds), payPlatform 这个query 对每5s的tumble窗口做统计。 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三

Re: pyflink1.11.0window

2020-07-14 Thread Shuiqiang Chen
琴师你好, 异常栈信息org.apache.flink.table.api.ValidationException: A tumble window expects a size value literal. 看起来是接下tumble window定义的代码不太正确吧 Best, Shuiqiang 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 上午10:27写道: > 你好: > > 我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。 > Traceback

Re: Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 Thread Shuiqiang Chen
Hi, 看样子是kafka table source没有成功创建,也许你需要将 org.apache.flink flink-sql-connector-kafka_2.11 ${flink.version} 这个jar 放到 FLINK_HOME/lib 目录下 Congxian Qiu 于2020年7月10日周五 上午10:57写道: > Hi > > 从异常看,可能是某个包没有引入导致的,和这个[1]比较像,可能你需要对比一下需要的是哪个包没有引入。 > > PS 从栈那里看到是 csv

Re: pyflink1.11.0window

2020-07-09 Thread Shuiqiang Chen
琴师你好, 你的source ddl里有指定time1为 time attribute吗? create table source1( id int, time1 timestamp, type string, WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND ) with (...) 奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月10日周五 上午8:43写道: >