Re: sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Ivan Yang
Hi Rahul, Try to increase taskmanager.network.memory.max to 1GB, basically double what you have now. However, you only have 4GB RAM for the entire TM, seems out of proportion to have 1GB network buffer with 4GB total RAM. Reducing number of shuffling will require less network buffer. But if

Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
hi ok,谢谢,懂了哈哈 在 2020-07-31 21:27:02,"Leonard Xu" 写道: >Hello > >> 在 2020年7月31日,21:13,chenxuying 写道: >> >> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 > >简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的, >是通过用户的query来决定写入的模式是upsert 还是 append ,

allowNonRestoredState: metadata file in checkpoint dir missing

2020-07-31 Thread Deshpande, Omkar
Hello, When deleting an operator we run our application with --allowNonRestoredState=true, as described in the documentation. When running with this

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Hequn Cheng
Hi Jincheng, Thanks a lot for raising the discussion. +1 for the FLIP. I think this will bring big benefits for the PyFlink users. Currently, the Python TableAPI document is hidden deeply under the TableAPI tab which makes it quite unreadable. Also, the PyFlink documentation is mixed with

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Hequn Cheng
Hi Jincheng, Thanks a lot for raising the discussion. +1 for the FLIP. I think this will bring big benefits for the PyFlink users. Currently, the Python TableAPI document is hidden deeply under the TableAPI tab which makes it quite unreadable. Also, the PyFlink documentation is mixed with

sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Rahul Patwari
Hi, We are observing "Insufficient number of Network Buffers" issue Sporadically when Flink is upgraded from 1.4.2 to 1.8.2. The state of the tasks with this issue translated from DEPLOYING to FAILED. Whenever this issue occurs, the job manager restarts. Sometimes, the issue goes away after the

Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Vijayendra Yadav
Thank You Niels. Would you have something for the scala object class. Say for example if I want to implement a unit test ( not integration test) for below code or similar :

Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Niels Basjes
Does this test in one of my own projects do what you are looking for? https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107 On Fri, 31 Jul 2020, 20:20 Vijayendra

Kafka source, committing and retries

2020-07-31 Thread Jack Phelan
Scenario === A partition that Flink is reading: [ 1 - 2 - 3 - 4 - 5 - 6 - 7 - | 8 _ 9 _ 10 _ 11 | 12 ~ 13 ] [. Committed. | In flight | unread ] Kafka basically breaks off pieces of the end of the queue and shoves them downstream for processing? So suppose

[Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Vijayendra Yadav
Hi Team, Looking for some help and reference code / material to implement unit tests of possible scenarios in Flink *streaming *Code that should assert specific cases. Regards, Vijay

Behavior for flink job running on K8S failed after restart strategy exhausted

2020-07-31 Thread Eleanore Jin
Hi Experts, I have a flink cluster (per job mode) running on kubernetes. The job is configured with restart strategy restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s So after 3 times retry, the job will be marked as FAILED, hence the pods are not running.

Behavior for flink job running on K8S failed after restart strategy exhausted

2020-07-31 Thread Eleanore Jin
Hi Experts, I have a flink cluster (per job mode) running on kubernetes. The job is configured with restart strategy restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s So after 3 times retry, the job will be marked as FAILED, hence the pods are not running.

Is there a way to get file "metadata" as part of stream?

2020-07-31 Thread John Smith
Hi, so reading a CSV file using env.readFile() with RowCsvInputFormat. Is there a way to get the filename as part of the row stream? The file contains a unique identifier to tag the rows with.

Re: How to stream CSV from S3?

2020-07-31 Thread John Smith
Hi Yes it works :) For the Java guys... final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String path = "file:///foo/bar"; TypeInformation[] fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO

JDBCOutputFormat dependency loading error

2020-07-31 Thread Flavio Pompermaier
Hi to all, I'm trying to run my DataSet job on Flink 1.11.0 and I'm connecting toward Mariadb in my code. I've put the mariadb-java-client-2.6.0.jar in the lib directory and in the pom.xml I set that dependency as provided. The code runs successfully from the Ide but when I try to run the code on

Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread Leonard Xu
Hello > 在 2020年7月31日,21:13,chenxuying 写道: > > 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的, 是通过用户的query来决定写入的模式是upsert 还是 append , 你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 如果已经在用1.11了,1.10的文档可以不用看的。

Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
谢谢回答 使用新属性可以 成功修改记录 , 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 在 2020-07-31 16:46:41,"Leonard Xu" 写道: >Hi, chenxuying > >看你还是用的还是 " 'connector.type' = 'jdbc', …. " >,这是老的option,使用老的option参数还是需要根据query推导主键, >需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert

Re: Flink sql 转义字符问题

2020-07-31 Thread Leonard Xu
Hi, zilong SPLIT_INDEX(${xxx}, ‘;’, 0) ‘;’ 分号不是特殊字符,编译时应该不会报错的,我在Flink 1.11.1 用DDL 测试了下, 能够work的,不知道你的环境是怎样的。 U&'\003B' 是 ; 的 unicode编码,所以用这个unicode编码是可以的,但一般这种用法是在需要用不可见字符分割时我们这样使用, 比如 \n 对应的s是 U&'\\000A’ ,\r 对应的是 U&'\\000D’, 对于分号这种可见字符来讲,不需要用unicode编码就可以的。 祝好 Leonard > 在

回复: Flink sql 转义字符问题

2020-07-31 Thread Hannan Kan
我看官方文档https://help.aliyun.com/knowledge_detail/62544.html中接口是VARCHAR SPLIT_INDEX(VARCHAR str, VARCHAR sep, INT index) sep 是字符串类型。是不是要用双引号或者看下分号是不是英文的? --原始邮件-- 发件人:

Re: Flink sql 转义字符问题

2020-07-31 Thread zilong xiao
U&'\003B' 这么写就可以了 感觉好奇怪啊。。 李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道: > 加反斜杠就可以。\; 只不过分号应该不是特殊字符吧。 > > > 在 2020年7月31日,下午8:13,zilong xiao 写道: > > > > SPLIT_INDEX(${xxx}, ';', > > > 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~ >

Re: Support for Event time clock specific to each stream in parallel streams

2020-07-31 Thread David Anderson
It sounds like you would like to have something like event-time-based windowing, but with independent watermarking for every key. An approach that can work, but it is somewhat cumbersome, is to not use watermarks or windows, but instead put all of the logic in a KeyedProcessFunction (or

Re: Flink sql 转义字符问题

2020-07-31 Thread zilong xiao
实测反斜杠好像也不行 李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道: > 加反斜杠就可以。\; 只不过分号应该不是特殊字符吧。 > > > 在 2020年7月31日,下午8:13,zilong xiao 写道: > > > > SPLIT_INDEX(${xxx}, ';', > > > 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~ >

Re: Flink sql 转义字符问题

2020-07-31 Thread 李奇
加反斜杠就可以。\; 只不过分号应该不是特殊字符吧。 > 在 2020年7月31日,下午8:13,zilong xiao 写道: > > SPLIT_INDEX(${xxx}, ';', > 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~

Flink sql 转义字符问题

2020-07-31 Thread zilong xiao
SPLIT_INDEX(${xxx}, ';', 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~

Re: Re: How to retain the column'name when convert a Table to DataStream

2020-07-31 Thread Jark Wu
Hi, For now, you can explicitly set the RowTypeInfo to retain the field names. This works in master branch: *val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType)* // t1 stream schema: Row(a: Integer, b: Integer) println(s"t1 stream schema: ${t1Stream.getType()}")

Re: Flink 1.11 submit job timed out

2020-07-31 Thread Matt Wang
遇到了同样的问题,也是启动了 taskmanager-query-state-service.yaml 这个服务后,作业才能正常提交的,另外我是在本地装的 k8s 集群进行测试的,如果是 GC 的问题,启不启动 TM service 应该不会有影响的 -- Best, Matt Wang On 07/27/2020 15:01,Yang Wang wrote: 建议先配置heartbeat.timeout的值大一些,然后把gc log打出来 看看是不是经常发生fullGC,每次持续时间是多长,从你目前提供的log看,进程内JM->RM都会心跳超时 怀疑还是和GC有关的

Support for Event time clock specific to each stream in parallel streams

2020-07-31 Thread Sush Bankapura
Hi, We have a single Flink job that works on data from multiple data sources. These data sources are not aligned in time and also have intermittent connectivity lasting for days, due to which data will arrive late We attempted to use the event time and watermarks with parallel streams using

Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

2020-07-31 Thread Vikash Dat
Thanks for the reply. I am currently using 1.10 but also saw it happens in 1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to 1.10 at the moment. Are there any known work arounds? On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren wrote: > Hi Vikash, > > It's a bug about

Re: Customization of execution environment

2020-07-31 Thread Aljoscha Krettek
I agree! My long-term goal is that a Configuration is the basis of truth and that the programmatic setter methods and everything else just modify the underlying configuration. We have made big steps in at least allowing to configure most (if not all) StreamExecutionEnvironment and

Re:Re: How to retain the column'name when convert a Table to DataStream

2020-07-31 Thread izual
I create a JIRA issue here, https://issues.apache.org/jira/browse/FLINK-18782 And thanks for your advice to avoid 「top-level projection/rename」^_^ At 2020-07-30 16:58:45, "Dawid Wysakowicz" wrote: Hi, I am afraid you are facing an issue that was not checked for/was not considered. I

Re: RocksDBKeyedStateBackend如何写磁盘

2020-07-31 Thread jun su
hi, 看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么? private void flushIfNeeded() throws RocksDBException { boolean needFlush = batch.count() == capacity || (batchSize > 0 && getDataSize() >= batchSize); if (needFlush) { flush(); } } batchSize 来自 state.backend.rocksdb.write-batch-size

Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread Leonard Xu
Hi, chenxuying 看你还是用的还是 " 'connector.type' = 'jdbc', …. " ,这是老的option,使用老的option参数还是需要根据query推导主键, 需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式. Best Leonard [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options

??????RocksDBKeyedStateBackend??????????

2020-07-31 Thread jiafu
writerbuffer??flushcheckpoint??snapshot??rocksdb??checkpointsst?? ---- ??:

RocksDBKeyedStateBackend如何写磁盘

2020-07-31 Thread jun su
hi all, 请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢 -- Best, Jun Su

Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread 李奇
改成update模式,然后也可以修改唯一主键为自然键 > 在 2020年7月31日,下午4:13,chenxuying 写道: > > hi > 我使用的flink 1.11.0版本 > 代码如下 > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); >

flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
hi 我使用的flink 1.11.0版本 代码如下 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); tableEnvironment.executeSql(" " + " CREATE TABLE mySource ( " + " a bigint, " + " b bigint " + "

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Marta Paes Moreira
Hi, Jincheng! Thanks for creating this detailed FLIP, it will make a big difference in the experience of Python developers using Flink. I'm interested in contributing to this work, so I'll reach out to you offline! Also, thanks for sharing some information on the adoption of PyFlink, it's great

Re: Colocating Compute

2020-07-31 Thread Dawid Wysakowicz
Hi Satyam, It should be fine to have unbounded InputFormat. The important thing is not to produce more splits than there are parallel instances of your source. In createInputSplits(int minNumSplits) generate only minNumSplits. It is so that all splits can be assigned immediately. Unfortunately

EMR Saving CheckPoint into to S3 and 403 Access Denied

2020-07-31 Thread mars
Hi, I am running Flin k Jobs on EMR (5.30.1) and trying to save the checkpoint info to S3. I have the following in flink-conf.xml file and when i try to submit the jobs to flink cluster the JobManager is is failing as it is unable to save the checkpoint info to S3. s3.access-key: <>

Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

2020-07-31 Thread Qingsheng Ren
Hi Vikash, It's a bug about classloader used in `abortTransaction()` method in `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version 1.10.0? Vikash Dat 于2020年7月30日周四 下午9:26写道: > Has anyone had success