Re: flink requires table key when insert into upsert table sink

2018-08-10 Thread 徐涛
Hi Fabian, Could you give an example that the query has a unique key? What is the mechanism flink infer which field is the unique key(s)? Thanks a lot! Best, Henry > 在 2018年8月11日,上午5:21,Fabian Hueske 写道: > > Hi Henry, > > The problem is that the table that results from

Re: Yahoo Streaming Benchmark on a Flink 1.5 cluster

2018-08-10 Thread Naum Gjorgjeski
Hi all, I still cannot resolve the problem. Can I please get any advice for it? Thank you. Best regards, Naum Gjorgjeski From: Naum Gjorgjeski Sent: Wednesday, August 1, 2018 1:05 AM To: user@flink.apache.org Subject: Yahoo Streaming Benchmark on a Flink

flink requires table key when insert into upsert table sink

2018-08-10 Thread 徐涛
Hi All, I am using flink 1.6 to generate some realtime programs. I want to write the output to table sink, the code is as below. At first I use append table sink, which error message tells me that I should use upsert table sink, so I write one. But still another error “Caused by:

Re: flink requires table key when insert into upsert table sink

2018-08-10 Thread Hequn Cheng
Hi, *> Could you give an example that the query has a unique key?* Consider the following sql: SELECT a, SUM(b) as d > FROM Orders > GROUP BY a The result table contains unique key of a. A document about Streaming Concepts[1] may be helpful for you. *> What is the mechanism flink infer

Re: Yahoo Streaming Benchmark on a Flink 1.5 cluster

2018-08-10 Thread vino yang
Hi Namu, I don't think you need to pay attention to the internals of the Flink API. Its interface is backward compatible. If you update the dependent version of the API and the corresponding version of the Flink system so that their versions are consistent, there should be no problems. Please

Re: Dataset.distinct - Question on deterministic results

2018-08-10 Thread Will Bastian
Fabian, Thanks for the clear response. You addressed my question, and the suggestions provide clear context on how to address. Best, Will On Fri, Aug 10, 2018 at 5:52 AM Fabian Hueske wrote: > Hi Will, > > The distinct operator is implemented as a groupBy(distinctKeys) and a > ReduceFunction

Re: flink requires table key when insert into upsert table sink

2018-08-10 Thread Fabian Hueske
Hi Henry, The problem is that the table that results from the query does not have a unique key. You can only use an upsert sink if the table has a (composite) unique key. Since this is not the case, you cannot use upsert sink. However, you can implement a StreamRetractionTableSink which allows to

Re: SQL parallelism setting

2018-08-10 Thread Timo Walther
Hi, currenlty, you can only set the parallelism for an entire Flink job using env.setParallelism(). There are rough ideas of how we could improve the situation in the future to control the parallelism of individual operators but this might need one or two releases. Regards, Timo Am

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Fabian Hueske
Hi Averell, One comment regarding what you said: > As my files are small, I think there would not be much benefit in checkpointing file offset state. Checkpointing is not about efficiency but about consistency. If the position in a split is not checkpointed, your application won't operate with

Re: flink telemetry/metrics

2018-08-10 Thread Chesnay Schepler
What is wrong with the metrics that are shown in graphite? Can you provide us with the metrics section of your flink-conf.yaml? Are there any metric-related warnings in the TaskManager logs? On 09.08.2018 01:38, John O wrote: I’m working on getting a flink job into production. As part of the

SQL parallelism setting

2018-08-10 Thread Shu Li Zheng
Hi community, Is there a way to change parallelism on sqlQuery()? Regards, Shu li Zheng

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Jörn Franke
Or you write a custom file system for Flink... (for the tar part). Unfortunately gz files can only be processed single threaded (there are some multiple thread implementation but they don’t bring the big gain). > On 10. Aug 2018, at 07:07, vino yang wrote: > > Hi Averell, > > In this case,

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Fabian Hueske
Hi Averell, Conceptually, you are right. Checkpoints are taken at every operator at the same "logical" time. It is not important, that each operator checkpoints at the same wallclock time. Instead, the need to take a checkpoint when they have processed the same input. This is implemented with

Re: JDBCInputFormat and SplitDataProperties

2018-08-10 Thread Fabian Hueske
Can you share the plan for the program? Are you sure that more than 1 split is generated by the JdbcInputFormat? 2018-08-10 12:04 GMT+02:00 Alexis Sarda : > It seems I may have spoken too soon. After executing the job with more > data, I can see the following things in the Flink dashboard: > >

Re: UTF-16 support for TextInputFormat

2018-08-10 Thread Fabian Hueske
Hi David, Thanks for digging into the code! I had a quick look into the classes as well. As far as I can see, your analysis is correct and the BOM handling in DelimitedInputFormat and TextInputFormat (and other text-based IFs such as CsvInputFormat) is broken. In fact, its obvious that nobody

Re: Flink Rebalance

2018-08-10 Thread antonio saldivar
Hi Fabian Thank you, yes there are just map functions, i will do it that way with methods to get it faster On Fri, Aug 10, 2018, 5:58 AM Fabian Hueske wrote: > Hi, > > Elias and Paul have good points. > I think the performance degradation is mostly to the lack of function > chaining in the

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Averell
Thank you Vino, Jorn, and Fabian. Please forgive me for my ignorant, as I am still not able to fully understand state/checkpointing and the statement that Fabian gave earlier: "/In either case, some record will be read twice but if reading position can be reset, you can still have exactly-once

Re: Dataset.distinct - Question on deterministic results

2018-08-10 Thread Fabian Hueske
Hi Will, The distinct operator is implemented as a groupBy(distinctKeys) and a ReduceFunction that returns the first argument. Hence, it depends on the order in which the records are processed by the ReduceFunction. Flink does not maintain a deterministic order because it is quite expensive in

Re: Flink Rebalance

2018-08-10 Thread Fabian Hueske
Hi, Elias and Paul have good points. I think the performance degradation is mostly to the lack of function chaining in the rebalance case. If all steps are just map functions, they can be chained in the no-rebalance case. That means, records are passed via function calls. If you add rebalancing,

Re: JDBCInputFormat and SplitDataProperties

2018-08-10 Thread Alexis Sarda
It seems I may have spoken too soon. After executing the job with more data, I can see the following things in the Flink dashboard: - The first subtask is a chained DataSource -> GroupCombine. Even with parallelism set to 24 and a ParameterValuesProvider returning Array(Array("first"),

Re: Standalone cluster instability

2018-08-10 Thread Piotr Nowojski
Hi, Please post full TaskManager logs, including stderr and stdout. (Have you checked the stderr/stdout for some messages?) I could think of couple reasons: 1. process segfault 2. process killed by OS 3. OS failure 1. Should be visible by some message in stderr/stdout file and can be caused by

Re: JDBCInputFormat and SplitDataProperties

2018-08-10 Thread Alexis Sarda
It ended up being a wrong configuration of the cluster; there was only 1 task manager with 1 slot. If I submit a job with "flink run -p 24 ...", will the job hang until at least 24 slots are available? Regards, Alexis. On Fri, 10 Aug 2018, 14:01 Fabian Hueske wrote: > Can you share the plan