How to debug window states

2020-07-14 Thread Paul Lam
Hi, Since currently State Processor doesn’t support window states, what’s the recommended way to debug window states? The background is that I have a SQL job whose states, mainly window aggregation operator states, keep increasing steadily. The things I’ve checked: - Outputs are as expected. -

Re: pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
Thank you Xingbo, this will certainly help! On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang wrote: > Hi Manas, > > I have created a issue[1] to add related doc > > [1] https://issues.apache.org/jira/browse/FLINK-18598 > > Best, > Xingbo > > Manas Kale 于2020年7月14日周二 下午4:15写道: > >> Thank you for the

Re: Flink 1.11 test Parquet sink

2020-07-14 Thread Jark Wu
I think this might be a bug in `tableEnv.fromValues`. Could you try to remove the DataType parameter, and let the framework derive the types? final Table inputTable = tableEnv.fromValues( Row.of(1L, "Hello"), // Row.of(2L, "Hello"), // Row.of(3L, ""), // Row.of(4L,

Re: Flink 1.11 test Parquet sink

2020-07-14 Thread Leonard Xu
Hi, Flavio I reproduced your issue, and I think it should be a bug. But I’m not sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner module shaded calcite. Maybe Danny can help explain more. CC: Danny Best Leonard Xu > 在 2020年7月14日,23:06,Flavio Pompermaier 写道: > > If I

Re: Flink Kafka connector in Python

2020-07-14 Thread Xingbo Huang
Hi Manas, If you want to return a RowType in Python UDF, you can use Row Class which extends from python tuple. You can use the following statement to import Row : from pyflink.table import Row Best, Xingbo Manas Kale 于2020年7月6日周一 下午8:08写道: > I also tried doing this by using a User Defined Func

Re: pyFlink 1.11 streaming job example

2020-07-14 Thread Xingbo Huang
Hi Manas, I have created a issue[1] to add related doc [1] https://issues.apache.org/jira/browse/FLINK-18598 Best, Xingbo Manas Kale 于2020年7月14日周二 下午4:15写道: > Thank you for the quick reply Xingbo! > Is there some documented webpage example that I can refer to in the > future for the latest p

Re: Mongodb_sink

2020-07-14 Thread lec ssmi
First of all, mongodb itself does not support transactions. But you can init a buffer to save each row when transaction begins, and save all the buffered records to database once when transaction completes. C DINESH 于2020年7月15日周三 上午9:28写道: > Hello all, > > Can we implement TwoPhaseCommitPro

Re: Pyflink JavaPackage Error

2020-07-14 Thread Xingbo Huang
Hi Jesse, For how to add jar packages, you can refer to the Common Questions doc[1] of PyFlink. PyFlink 1.10 and 1.11 have some differences in the way of adding jar packages which the document has a detailed introduction [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/p

Mongodb_sink

2020-07-14 Thread C DINESH
Hello all, Can we implement TwoPhaseCommitProtocol for mongodb to get EXACTLY_ONCE semantics. Will there be any limitation for it? Thanks, Dinesh.

ERROR submmiting a flink job

2020-07-14 Thread Aissa Elaffani
Hello Guys, I am trying to launch a FLINK app on a distance server, but I have this error message. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 8bf7f299746e051ea7b94afd07e2

RestartStrategy failure count when losing a Task Manager

2020-07-14 Thread Jiahui Jiang
Hello Flink, I have some questions regarding to the guideline on configuring restart strategy. I was testing a job with the following setup: 1. There are many tasks, but currently I'm running with only 2 parallelism, but plenty of task slots (4 TM and 4 task slot in each TM). 2. It's ran

Re: Map type param escaping :

2020-07-14 Thread Bohinski, Kevin
Figured it out, pulled StructuredOptionsSplitter into a debugger and was able to get it working with: -Dkubernetes.jobmanager.annotations="\"KEY:\"\"V:A:L:U:E\"\"\"" Best kevin

Pyflink JavaPackage Error

2020-07-14 Thread Jesse Lord
I am trying to read from kafka using pyflink table API on EMR 5.30, Flink 1.10 (and I have reproduced the error with 1.11). I am getting an error the following error using either `flink run -py` or pyflink-shell.sh (the error message below was generated in the pyflink shell): >>> Kafka() Traceb

Map type param escaping :

2020-07-14 Thread Bohinski, Kevin
Hi, How do we escape : in map type param? For example, we have: -Dkubernetes.jobmanager.annotations=KEY:V:A:L:U:E Which should result in {“KEY”: “V:A:L:U:E”}. Best, kevin

Backpressure on Window step

2020-07-14 Thread Nelson Steven
Hello! We are experiencing occasional backpressure on a Window function in our pipeline. The window is on a KeyedStream and is triggered by an EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a fanout and we use the window to sort things into batches based on the Key for t

Re: Missing jars

2020-07-14 Thread Daves Open
Thanks for your help. I had to create a dependency like this to reference the flink-avro module found in the flink-formats pom. Learn something new every day. org.apache.flink flink-avro 1.11.0 provided On Tue, Jul 14, 2020 at 11:43 AM Chesnay Schepler wrote: > flink-formats is a pom

Re: Flink DataSet Iterate updating additional variable

2020-07-14 Thread Khachatryan Roman
Hi Antonio, Yes, you are right. Revisiting your question, I'm wondering whether it's possible to partition speeds and nodes in the same way (stably across iterations)? (I'm assuming a distributed setup) If not, each iteration would have to wait for *all* subtasks of the previous iteration to finis

Re: Missing jars

2020-07-14 Thread Chesnay Schepler
flink-formats is a pom artifact, meaning that there are no jars for it. You should add a dependency to the specific format(s) you are interested in, like flink-formats-csv. On 14/07/2020 17:41, Daves Open wrote: I added flink-formats to my pom.xml file, but the jar files are not found.  I che

Missing jars

2020-07-14 Thread Daves Open
I added flink-formats to my pom.xml file, but the jar files are not found. I checked in mvnrepository and the pom entry exists, but there are no files. Can you please confirm that the jars for flink-format are available in the repos? The following is my pom entry: org.apache.flink flink

Re: Flink 1.11 test Parquet sink

2020-07-14 Thread Flavio Pompermaier
If I use final Table inputTable = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("col1", DataTypes.STRING().notNull()), DataTypes.FIELD("col2", DataTypes.STRING().notNull()) ), .. tableEnv.executeSql(// "CREATE TABLE `out` (" + "co

Re: Flink 1.11 test Parquet sink

2020-07-14 Thread Flavio Pompermaier
Sorry, obviously " 'format' = 'parquet'" + is without comment :D On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier wrote: > Hi to all, > I'm trying to test write to parquet using the following code but I have an > error: > > final TableEnvironment tableEnv = > DatalinksExecutionEnvironment.ge

Flink 1.11 test Parquet sink

2020-07-14 Thread Flavio Pompermaier
Hi to all, I'm trying to test write to parquet using the following code but I have an error: final TableEnvironment tableEnv = DatalinksExecutionEnvironment.getBatchTableEnv(); final Table inputTable = tableEnv.fromValues(// DataTypes.ROW(// DataTypes.FIELD("col1", DataTyp

Re: DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Manas Kale
Thanks for the quick replies Dawid and Leonard... I had both flink-json JARs for 1.10 and 1.11. I deleted 1.10 and now it works! On Tue, Jul 14, 2020 at 4:17 PM Leonard Xu wrote: > Hi,Kale > > I think you’re using correct TIMESTAMP Data type in JSON format, and this > should work properly. > But

Key group is not in KeyGroupRange

2020-07-14 Thread Ori Popowski
I'm getting this error when creating a savepoint. I've read in https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by unstable hashcode or equals on the key, or improper use of reinterpretAsKeyedStream. My key is a string and I don't use reinterpretAsKeyedStream, so what's going on?

Re: DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Leonard Xu
Hi,Kale I think you’re using correct TIMESTAMP Data type in JSON format, and this should work properly. But looks like you used an old version `flink-json` dependency from the log. Could you check the version of `flink-json` is 1.11.0 ? Best, Leonard Xu > 在 2020年7月14日,18:07,Manas Kale 写道: >

Re: DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Dawid Wysakowicz
Forget my previous message. This is most probably some class conflict. The SQL_TIMESTAMP_FORMAT field was added in 1.11. It looks as if you were using old version of the TimeFormats class from an earlier version of Flink. Best, Dawid On 14/07/2020 12:07, Manas Kale wrote: > Hi, > I am trying to

Re: DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Dawid Wysakowicz
Can you try changing the precision to 6 or try changing the format in json to produce only 3 fractional digits? As described in the JSON docs[1] the expected default format for timestamp is: -MM-dd HH:mm:ss.s{precision} Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-

DDL TIMESTAMP(3) parsing issue

2020-07-14 Thread Manas Kale
Hi, I am trying to parse this JSON message: {"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2, "time_st": "2020-07-14 15:15:19.60"} using pyFlink 1.11 DDL with this code: ddl_source = f""" CREATE TABLE {INPUT_TABLE} ( `monitorId` STRING, `deviceId` STRING,

Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-14 Thread David Magalhães
Hi Congxian, sorry for the late reply. I'm using the filesystem with an S3 path as the default state backend in flink-conf.yml (state.backend: filesystem). The Flink version I'm using is 1.10.1. By "The task manager did not clean up the state", I mean what the taskmanager was writing on disk the

Re: pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
Thank you for the quick reply Xingbo! Is there some documented webpage example that I can refer to in the future for the latest pyFlink 1.11 API? I couldn't find anything related to awaiting asynchronous results. Thanks, Manas On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang wrote: > Hi Manas, > >

Re: pyFlink 1.11 streaming job example

2020-07-14 Thread Xingbo Huang
Hi Manas, I tested your code, but there are no errors. Because execute_sql is an asynchronous method, you need to await through TableResult, you can try the following code: from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnviron

Re: Flink DataSet Iterate updating additional variable

2020-07-14 Thread Antonio Martínez Carratalá
Hi Roman, Thank you for your quick reply, but as far as I know broadcast variables cannot be written, my problem is that I need to update the value of the speed variable to use it in the next iteration. Iterate only has one input dataset and propagates it to the next iteration using closeWith(),

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

2020-07-14 Thread Xingbo Huang
Hi Manas, Yes, this is a bug which I have also encountered in the Descriptor API, but I don't found the corresponding issue. You can create an issue to report this problem. There are similar bugs in the current descriptor API, so DDL is more recommended way. Now the community has started a discuss

pyFlink 1.11 streaming job example

2020-07-14 Thread Manas Kale
Hi, I am trying to get a simple streaming job running in pyFlink and understand the new 1.11 API. I just want to read from and write to kafka topics. Previously I was using t_env.execute("jobname"), register_table_source() and register_table_sink() but in 1.11 all 3 were deprecated and replaced by