Job leak in attached mode (batch scenario)

2019-07-15 Thread qi luo
Hi guys, We runs thousands of Flink batch job everyday. The batch jobs are submitted in attached mode, so we can know from the client when the job finished and then take further actions. To respond to user abort actions, we submit the jobs with "—shutdownOnAttachedExit” so the Flink cluster can

Re: Creating a Source function to read data continuously

2019-07-15 Thread Biao Liu
Hi Soheil, I assume that you are using `DataStream` API. Please check the document [1] to get more information. Other guys said a lot about this. Regardless the interfaces, I'm just wondering how could you read a Mysql table "continuously"? Kafka can be used as a message queue which is convenient

Re:Re: Creating a Source function to read data continuously

2019-07-15 Thread Haibo Sun
Hi, Soheil As Caizhi said, to create a source that implements `SourceFunction`, you can first take a closer look at the example in javadoc (https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html). Although `Input

Re: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

2019-07-15 Thread sri hari kali charan Tummala
this is the error. org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment On Mon, Jul 15, 2019 at 9:54 PM Caizhi Weng wrote: > Hi Kali, > > What's the exception thrown or error message hinted when executing the > erroneo

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Lee, it writes only after the job is killed and also I dont see all the records ? is there a workaround? tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", FileSystem.WriteMode.NO_OVERWR

Re: Creating a Source function to read data continuously

2019-07-15 Thread Caizhi Weng
Hi Soheil, It's not recommended to implement a streaming source using `InputFormat` (it's mainly used for batch source). To implement a streaming source, `SourceFunction` is recommended. It's clearly written (with examples) in the java docs in `SourceFucntion` how to write a `run` and `cancel` me

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Lee, I did try Option 1:- it writes to CSV file only if I kill the running job. tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3", FileSystem.WriteMode.OVERWRITE,"~","|") OutPut:- 2> (tr

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread JingsongLee
Hi caizhi and kali: I think this table should use toRetractStream instead of toAppendStream, and you should handle the retract messages. (If you just use distinct, the message should always be accumulate message) Best, JingsongLee --

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Weng, another issue now (Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.), here is the full code https://github.com/kali786516/FlinkStreamAndSql/blob/15e5e60d6c044bc830f5ef2d79c071389e

Re: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

2019-07-15 Thread Caizhi Weng
Hi Kali, What's the exception thrown or error message hinted when executing the erroneous step? Please print them here so that we can investigate the problem. sri hari kali charan Tummala 于2019年7月16日周二 上午4:49写道: > Hi , > > I am trying to write flink table to streaming Sink it fails at casting >

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread Caizhi Weng
Hi Kali, Currently Flink treats all aggregate functions as retractable. As `distinct` is an aggregate function, it's considered by the planner that it might update or retract records (although from my perspective it won't...). Because csv table sink is an append only sink (it's hard to update what

Automatic deployment of new version of streaming stateful job

2019-07-15 Thread Maxim Parkachov
Hi, I'm trying to bring my first stateful streaming Flink job to production and have trouble understanding how to integrate it with CI/CD pipeline. I can cancel the job with savepoint, but in order to start new version of application I need to specify savepoint path manually ? So, basically my qu

Creating a Source function to read data continuously

2019-07-15 Thread Soheil Pourbafrani
Hi, Extending the "RichInputFormat" class I could create my own MySQL input. I want to use it for reading data continuously from a table but I observed that the "RichInputFormat" class read all data and finish the job. I guess for reading data continuously I need to extend the "SourceFunction" bu

Fwd: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

2019-07-15 Thread sri hari kali charan Tummala
Hi , I am trying to write flink table to streaming Sink it fails at casting Java to Scala or Scala to Java, it fails at below step can anyone help me out ? about this error. val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/sr

Re: Flink SQL API: Extra columns added from order by

2019-07-15 Thread Morrisa Brenner
Hi Caizhi and Rong, Thanks for the responses! It's good to know that this is a known bug - right now we're just using Flink 1.8 and will work around this, but we look forward to getting the fixes in the future! All the best, Morrisa On Mon, Jul 15, 2019 at 2:25 AM Caizhi Weng wrote: > (Oops,

Fwd: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi All, I am trying to read data from kinesis stream and applying SQL transformation (distinct) and then tryting to write to CSV sink which is failinf due to this issue (org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.) , full code is he

Converting Metrics from a Reporter to a Custom Events mapping

2019-07-15 Thread Vijay Balakrishnan
Hi, I need to capture the Metrics sent from a Flink app to a Reporter and transform them to an Events API format I have designed. I have been looking at the Reporters( https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables) and have used them but what w

Re: State incompatible

2019-07-15 Thread Avi Levi
Thanks Haibo, bummer ;) On Mon, Jul 15, 2019 at 12:27 PM Haibo Sun wrote: > *This Message originated outside your organization.* > -- > Hi, Avi Levi > > I don't think there's any way to solve this problem right now, and Flink > documentation clearly shows that this i

Re:State incompatible

2019-07-15 Thread Haibo Sun
Hi, Avi Levi I don't think there's any way to solve this problem right now, and Flink documentation clearly shows that this is not supported. “Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versa will lead to compatibility failure

Re: Even key distribution workload

2019-07-15 Thread Biao Liu
Hi Navneeth, The "keyby" semantics must keep the data under same key into same task. So basically this data skew issue is caused by your data distribution. As far as I known, Flink could not handle data skew very well. There is a proposal about local aggregation which is still under discussion in

Re: Even key distribution workload

2019-07-15 Thread Caizhi Weng
Hi Navneeth, Is it possible for you to first keyBy something other than user id (for example, message id), and then aggregate the message of the same user in the same keyed stream, and finally aggregate all the keyed stream to get a per-user result? Navneeth Krishnan 于2019年7月15日周一 下午2:38写道: > H