What happened if my parallelism more than kafka partitions.

2017-11-08 Thread yunfan123
It seems the same partition data will be consume multi times? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Hi Ebru and Javier, Yes, if you could share this example job it would be helpful. Ebru: could you explain in a little more details how does your Job(s) look like? Could you post some code? If you are just using maps and filters there shouldn’t be any network transfers involved, aside from

Re: FlinkCEP behaviour with time constraints not as expected

2017-11-08 Thread Dawid Wysakowicz
Unforunately there is mistake in the docs the return type should be DataStream rather than SingleOuputStream The correct version should be: val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) val outputTag = OutputTag[String]("side-output") val result:

Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread yunfan123
The code of kafka partition assign is like follows: public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFF) % numParallelSubtasks; // here, the assumption is that

Re: Flink memory leak

2017-11-08 Thread ebru
Hi Javier, It would be helpful if you share your test job with us. Which configurations did you try? -Ebru > On 8 Nov 2017, at 14:43, Javier Lopez wrote: > > Hi, > > We have been facing a similar problem. We have tried some different > configurations, as proposed in

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Hi, Thanks for sharing this job. Do I need to feed some data to the Kafka to reproduce this issue with your script? Does this OOM issue also happen when you are not using the Kafka source/sink? Piotrek > On 8 Nov 2017, at 14:08, Javier Lopez wrote: > > Hi, > >

Re: Flink memory leak

2017-11-08 Thread ÇETİNKAYA EBRU ÇETİNKAYA EBRU
On 2017-11-08 15:20, Piotr Nowojski wrote: Hi Ebru and Javier, Yes, if you could share this example job it would be helpful. Ebru: could you explain in a little more details how does your Job(s) look like? Could you post some code? If you are just using maps and filters there shouldn’t be any

Re: FlinkCEP behaviour with time constraints not as expected

2017-11-08 Thread Federico D'Ambrosio
Thank you very much, that was really helpful Cheers, Federico 2017-11-08 13:51 GMT+01:00 Dawid Wysakowicz : > Unforunately there is mistake in the docs the return type should be > DataStream rather than SingleOuputStream > > The correct version should be: > > val

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Hi, You don't need data. With data it will die faster. I tested as well with a small data set, using the fromElements source, but it will take some time to die. It's better with some data. On 8 November 2017 at 14:54, Piotr Nowojski wrote: > Hi, > > Thanks for sharing

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Hi, We have been facing a similar problem. We have tried some different configurations, as proposed in other email thread by Flavio and Kien, but it didn't work. We have a workaround similar to the one that Flavio has, we restart the taskmanagers once they reach a memory threshold. We created a

Re: FlinkCEP behaviour with time constraints not as expected

2017-11-08 Thread Federico D'Ambrosio
Thank you very much, Dawid, for your thorough explanation, really useful. I totally missed the distinction between timed-out events and complete matches. I'd like to ask you one more thing, about the flinkCEP scala api: in the documentation, there is the following code: val patternStream:

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Hi, This is the test flink job we created to trigger this leak https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 And this is the python script we are using to execute the job thousands of times to get the OOM problem

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-08 Thread Tzu-Li (Gordon) Tai
Hi Ashish, From your description I do not yet have much of an idea of what may be happening. However, some of your observations seems reasonable. I’ll go through them one by one: I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst

Re: Flink memory leak

2017-11-08 Thread Aljoscha Krettek
@Nico & @Piotr Could you please have a look at this? You both recently worked on the network stack and might be most familiar with this. > On 8. Nov 2017, at 10:25, Flavio Pompermaier wrote: > > We also have the same problem in production. At the moment the solution is to

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Yes, I tested with just printing the stream. But it could take a lot of time to fail. On Wednesday, 8 November 2017, Piotr Nowojski wrote: > Thanks for quick answer. > So it will also fail after some time with `fromElements` source instead of Kafka, right? > Did you try

Generate watermarks per key in a KeyedStream

2017-11-08 Thread Shailesh Jain
Hi, I'm working on implementing a use case wherein different physical devices are sending events, and due to network/power issues, there can be a delay in receiving events at Flink source. One of the operators within the flink job is the Pattern operator, and there are certain patterns which are

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
I don’t know if this is relevant to this issue, but I was constantly getting failures trying to reproduce this leak using your Job, because you were using non deterministic getKey function: @Override public Integer getKey(Integer event) { Random randomGen = new Random((new Date()).getTime());

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Thanks for quick answer. So it will also fail after some time with `fromElements` source instead of Kafka, right? Did you try it also without a Kafka producer? Piotrek > On 8 Nov 2017, at 14:57, Javier Lopez wrote: > > Hi, > > You don't need data. With data it

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Btw, Ebru: I don’t agree that the main suspect is NetworkBufferPool. On your screenshots it’s memory consumption was reasonable and stable: 596MB -> 602MB -> 597MB. PoolThreadCache memory usage ~120MB is also reasonable. Do you experience any problems, like Out Of Memory errors/crashes/long

Weird performance on custom Hashjoin w.r.t. parallelism

2017-11-08 Thread m@xi
Hello everyone! I have implemented a custom parallel hashjoin algorithm (without windows feature) in order to calculate the join of two input streams on a common attribute using the CoFlatMap function and the state. After the join operator (which has parallelism p = #processors) operator I have a

Re: Job Manager Configuration

2017-11-08 Thread Till Rohrmann
Quick question Regina: Which version of Flink are you running? Cheers, Till On Tue, Nov 7, 2017 at 4:38 PM, Till Rohrmann wrote: > Hi Regina, > > the user code is uploaded once to the `JobManager` and then downloaded > from each `TaskManager` once when it first

Re: Generate watermarks per key in a KeyedStream

2017-11-08 Thread Xingcan Cui
Hi Shailesh, actually, the watermarks are generated per partition, but all of them will be forcibly aligned to the minimum one during processing. That is decided by the semantics of watermark and KeyedStream, i.e., the watermarks belong to a whole stream and a stream is made up of different

Re: ResultPartitionMetrics

2017-11-08 Thread Nico Kruber
Hi Aitozi, the difference is the scope: the normal metrics (without taskmanager.net.detailed-metrics) reflect _all_ buffers of a task while the detailed statistics are more fine-grained and give you statistics per input (or output) gate - the "total" there reflects the fact that each gate has

Do timestamps and watermarks exist after window evaluation?

2017-11-08 Thread Derek VerLee
When composing ("chaining") multiple windowing operations on the same stream are watermarks transmitted down stream after window evaluation, and are the records emitted from WindowFunctions given timestamps? Do I need to or should I always assignTimestampsAndWatermarks to the outputsof window

RE: Job Manager Configuration

2017-11-08 Thread Chan, Regina
Thanks for the responses! I’m currently using 1.2.0 – going to bump it up once I have things stabilized. I haven’t defined any slot sharing groups but I do think that I’ve probably got my job configured sub optimally. I’ve refactored my code so that I can submit subsets of the flow at a time

Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread Tzu-Li (Gordon) Tai
The `KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)` method returns the index of the target subtask for a given Kafka partition. The implementation in that method ensures that the same subtask index will always be returned for the same partition. Each consumer subtask will

Broadcast to all the other operators

2017-11-08 Thread Ladhari Sadok
Hello, I'm working on Rules Engine project with Flink 1.3, in this project I want to update some keyed operator state when external event occurred. I have a Datastream of updates (from kafka) I want to broadcast the data contained in this stream to all keyed operator so I can change the state in

When using Flink for CEP, can the data in Cassandra database be used for state

2017-11-08 Thread shyla deshpande
Hello all, I am new to Flink. We have our data in Cassandra database. We have a use case for CEP. I am checking out if Flink fits well for us. When processing the event data, I may want to pull data for the cassandra database like the user profile and join with the event data. Is there a way

How to best create a bounded session window ?

2017-11-08 Thread Vishal Santoshi
I am implementing a bounded session window but I require to short circuit the session if the session length ( in count of events or time ) go beyond a configured limit , a very reasonable scenario ( bot etc ) . I am using the approach as listed. I am not sure though if the Window itself is being

Re: Flink memory leak

2017-11-08 Thread Flavio Pompermaier
We also have the same problem in production. At the moment the solution is to restart the entire Flink cluster after every job.. We've tried to reproduce this problem with a test (see https://issues.apache.org/jira/browse/FLINK-7845) but we don't know whether the error produced by the test and the

Re: What happened if my parallelism more than kafka partitions.

2017-11-08 Thread Tzu-Li (Gordon) Tai
Hi! You can set the parallelism of the Flink Kafka Consumer independent of the number of partitions. If there are more consumer subtasks than the number of Kafka partitions to read (i.e. when the parallelism of the consumer is set higher than the number of partitions), some subtasks will

Re: Remove the HDFS directory in org.apache.flink.util.FileUtils.deletePathIfEmpty

2017-11-08 Thread Aljoscha Krettek
Hi, You images did not make it through to the mailing list. Best, Aljoscha > On 8. Nov 2017, at 05:25, 马庆祥 wrote: > > Hi,all, > > I enable checkpoint with the configuration in the below figure . > > > it works, but keep getting the below exception: > > > I

Re: Broadcast to all the other operators

2017-11-08 Thread Tony Wei
Hi Sadok, Since you want to broadcast Rule Stream to all subtasks, it seems that it is not necessary to use KeyedStream. How about use broadcast partitioner, connect two streams to attach the rule on each record or imply rule on them directly, and do the key operator after that? If you need to do

Re: Weird performance on custom Hashjoin w.r.t. parallelism

2017-11-08 Thread m@xi
Hello! I found out that the cause of the problem was the map that I have after the parallel join with parallelism 1. When I changed it to .map(new MyMapMeter).setParallelism(p) then when I increase the number of parallelism p the completion time decreases, which is reasonable. Somehow it was a

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-08 Thread Ashish Pokharel
Hi Grodon, Thanks for your responses. It definitely makes sense. I could pull this stack from the logs, entire log itself is pretty big - let me know if some samples before/after this may help. TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:

Re: Broadcast to all the other operators

2017-11-08 Thread Ladhari Sadok
Thank you for the answer, I know that solution, but I don't want to stream the rules all time. In my case I have the rules in Redis and at startup of flink they are loaded. I want to broadcast changes just when it occurs. Thanks. Le 9 nov. 2017 7:51 AM, "Tony Wei" a