Re: Parallel CEP

2019-01-23 Thread Dian Fu
I'm afraid you cannot do that. The inputs having the same key should be 
processed by the same CEP operator. Otherwise the results will be 
nondeterministic and also be wrong.

Regards,
Dian

> 在 2019年1月24日,下午2:56,dhanuka ranasinghe  写道:
> 
> In this example key will be same. I am using 1 million messages with same key 
> for performance testing. But still I want to process them parallel. Can't I 
> use Split function and get a SplitStream for that purpose?
> 
> On Thu, Jan 24, 2019 at 2:49 PM Dian Fu  > wrote:
> Hi Dhanuka,
> 
> Does the KeySelector of Event::getTriggerID generate the same key for all the 
> inputs or only generate very few key values and these key values happen to be 
> hashed to the same downstream operator? You can print the results of 
> Event::getTriggerID to check if it's that case.
> 
> Regards,
> Dian
> 
>> 在 2019年1月24日,下午2:08,dhanuka ranasinghe > > 写道:
>> 
>> Hi Dian,
>> 
>> Thanks for the explanation. Please find the screen shot and source code for 
>> above mention use case. And in main issue is though I use KeyedStream , 
>> parallelism not apply properly.
>> Only one host is processing messages.
>> 
>> 
>> 
>> Cheers,
>> Dhanuka
>> 
>> On Thu, Jan 24, 2019 at 1:40 PM Dian Fu > > wrote:
>> Whether using KeyedStream depends on the logic of your job, i.e, whether you 
>> are looking for patterns for some partitions, i.e, patterns for a particular 
>> user. If so, you should partition the input data before the CEP operator. 
>> Otherwise, the input data should not be partitioned.
>> 
>> Regards,
>> Dian 
>> 
>>> 在 2019年1月24日,下午12:37,dhanuka ranasinghe >> > 写道:
>>> 
>>> Hi Dian,
>>> 
>>> I tried that but then kafkaproducer only produce to single partition and 
>>> only single flink host working while rest not contribute for processing . I 
>>> will share the code and screenshot
>>> 
>>> Cheers 
>>> Dhanuka
>>> 
>>> On Thu, 24 Jan 2019, 12:31 Dian Fu >>  wrote:
>>> Hi Dhanuka,
>>> 
>>> In order to make the CEP operator to run parallel, the input stream should 
>>> be KeyedStream. You can refer [1] for detailed information.
>>> 
>>> Regards,
>>> Dian
>>> 
>>> [1]: 
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#detecting-patterns
>>>  
>>> 
>>> 
>>> > 在 2019年1月24日,上午10:18,dhanuka ranasinghe >> > > 写道:
>>> > 
>>> > Hi All,
>>> > 
>>> > Is there way to run CEP function parallel. Currently CEP run only 
>>> > sequentially
>>> > 
>>> > 
>>> > .
>>> > 
>>> > Cheers,
>>> > Dhanuka
>>> > 
>>> > -- 
>>> > Nothing Impossible,Creativity is more important than knowledge.
>>> 
>> 
>> 
>> 
>> -- 
>> Nothing Impossible,Creativity is more important than knowledge.
>> 
> 
> 
> 
> -- 
> Nothing Impossible,Creativity is more important than knowledge.



Re: Parallel CEP

2019-01-23 Thread dhanuka ranasinghe
In this example key will be same. I am using 1 million messages with same
key for performance testing. But still I want to process them parallel.
Can't I use Split function and get a SplitStream for that purpose?

On Thu, Jan 24, 2019 at 2:49 PM Dian Fu  wrote:

> Hi Dhanuka,
>
> Does the KeySelector of Event::getTriggerID generate the same key for all
> the inputs or only generate very few key values and these key values happen
> to be hashed to the same downstream operator? You can print the results of
> Event::getTriggerID to check if it's that case.
>
> Regards,
> Dian
>
> 在 2019年1月24日,下午2:08,dhanuka ranasinghe  写道:
>
> Hi Dian,
>
> Thanks for the explanation. Please find the screen shot and source code
> for above mention use case. And in main issue is though I use KeyedStream ,
> parallelism not apply properly.
> Only one host is processing messages.
>
> 
>
> Cheers,
> Dhanuka
>
> On Thu, Jan 24, 2019 at 1:40 PM Dian Fu  wrote:
>
>> Whether using KeyedStream depends on the logic of your job, i.e, whether
>> you are looking for patterns for some partitions, i.e, patterns for a
>> particular user. If so, you should partition the input data before the CEP
>> operator. Otherwise, the input data should not be partitioned.
>>
>> Regards,
>> Dian
>>
>> 在 2019年1月24日,下午12:37,dhanuka ranasinghe  写道:
>>
>> Hi Dian,
>>
>> I tried that but then kafkaproducer only produce to single partition and
>> only single flink host working while rest not contribute for processing . I
>> will share the code and screenshot
>>
>> Cheers
>> Dhanuka
>>
>> On Thu, 24 Jan 2019, 12:31 Dian Fu >
>>> Hi Dhanuka,
>>>
>>> In order to make the CEP operator to run parallel, the input stream
>>> should be KeyedStream. You can refer [1] for detailed information.
>>>
>>> Regards,
>>> Dian
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#detecting-patterns
>>>
>>> > 在 2019年1月24日,上午10:18,dhanuka ranasinghe 
>>> 写道:
>>> >
>>> > Hi All,
>>> >
>>> > Is there way to run CEP function parallel. Currently CEP run only
>>> sequentially
>>> >
>>> > 
>>> > .
>>> >
>>> > Cheers,
>>> > Dhanuka
>>> >
>>> > --
>>> > Nothing Impossible,Creativity is more important than knowledge.
>>>
>>>
>>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
> 
>
>
>

-- 
Nothing Impossible,Creativity is more important than knowledge.


Re: Parallel CEP

2019-01-23 Thread Dian Fu
Hi Dhanuka,

Does the KeySelector of Event::getTriggerID generate the same key for all the 
inputs or only generate very few key values and these key values happen to be 
hashed to the same downstream operator? You can print the results of 
Event::getTriggerID to check if it's that case.

Regards,
Dian

> 在 2019年1月24日,下午2:08,dhanuka ranasinghe  写道:
> 
> Hi Dian,
> 
> Thanks for the explanation. Please find the screen shot and source code for 
> above mention use case. And in main issue is though I use KeyedStream , 
> parallelism not apply properly.
> Only one host is processing messages.
> 
> 
> 
> Cheers,
> Dhanuka
> 
> On Thu, Jan 24, 2019 at 1:40 PM Dian Fu  > wrote:
> Whether using KeyedStream depends on the logic of your job, i.e, whether you 
> are looking for patterns for some partitions, i.e, patterns for a particular 
> user. If so, you should partition the input data before the CEP operator. 
> Otherwise, the input data should not be partitioned.
> 
> Regards,
> Dian 
> 
>> 在 2019年1月24日,下午12:37,dhanuka ranasinghe > > 写道:
>> 
>> Hi Dian,
>> 
>> I tried that but then kafkaproducer only produce to single partition and 
>> only single flink host working while rest not contribute for processing . I 
>> will share the code and screenshot
>> 
>> Cheers 
>> Dhanuka
>> 
>> On Thu, 24 Jan 2019, 12:31 Dian Fu >  wrote:
>> Hi Dhanuka,
>> 
>> In order to make the CEP operator to run parallel, the input stream should 
>> be KeyedStream. You can refer [1] for detailed information.
>> 
>> Regards,
>> Dian
>> 
>> [1]: 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#detecting-patterns
>>  
>> 
>> 
>> > 在 2019年1月24日,上午10:18,dhanuka ranasinghe > > > 写道:
>> > 
>> > Hi All,
>> > 
>> > Is there way to run CEP function parallel. Currently CEP run only 
>> > sequentially
>> > 
>> > 
>> > .
>> > 
>> > Cheers,
>> > Dhanuka
>> > 
>> > -- 
>> > Nothing Impossible,Creativity is more important than knowledge.
>> 
> 
> 
> 
> -- 
> Nothing Impossible,Creativity is more important than knowledge.
> 



[Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

2019-01-23 Thread Steven Wu
When we start a high-parallelism (1,600) job without any
checkpoint/savepoint, the job struggled to be deployed. After a few
restarts, it eventually got deployed and was running fine after the initial
struggle. jobmanager was very busy. Web UI was very slow. I saw these two
exceptions/failures during the initial failures.

I don't seem to see this issue when starting the same job from an external
checkpoint. or at least very rarely.

Anyone else experienced similar issue?

Thanks,
Steven

Exception #1

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
fe55bf158e89cf555be6582e577b9621 timed out.

at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)

at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Exception #2

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
not found.

at
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)

at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)

at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)

at java.util.TimerThread.mainLoop(Timer.java:555)

at java.util.TimerThread.run(Timer.java:505)


Re: Parallel CEP

2019-01-23 Thread Dian Fu
Whether using KeyedStream depends on the logic of your job, i.e, whether you 
are looking for patterns for some partitions, i.e, patterns for a particular 
user. If so, you should partition the input data before the CEP operator. 
Otherwise, the input data should not be partitioned.

Regards,
Dian 

> 在 2019年1月24日,下午12:37,dhanuka ranasinghe  写道:
> 
> Hi Dian,
> 
> I tried that but then kafkaproducer only produce to single partition and only 
> single flink host working while rest not contribute for processing . I will 
> share the code and screenshot
> 
> Cheers 
> Dhanuka
> 
> On Thu, 24 Jan 2019, 12:31 Dian Fu   wrote:
> Hi Dhanuka,
> 
> In order to make the CEP operator to run parallel, the input stream should be 
> KeyedStream. You can refer [1] for detailed information.
> 
> Regards,
> Dian
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#detecting-patterns
>  
> 
> 
> > 在 2019年1月24日,上午10:18,dhanuka ranasinghe  > > 写道:
> > 
> > Hi All,
> > 
> > Is there way to run CEP function parallel. Currently CEP run only 
> > sequentially
> > 
> > 
> > .
> > 
> > Cheers,
> > Dhanuka
> > 
> > -- 
> > Nothing Impossible,Creativity is more important than knowledge.
> 



Re: Flink CEP : Doesn't generate output

2019-01-23 Thread Dian Fu
Hi Dhanuka,

From the code you shared, it seems that you're using event time. The processing 
of elements is triggered by watermark in event time and so you should define 
how to generate the watermark, i.e with DataStream.assignTimestampsAndWatermarks

Regards,
Dian


> 在 2019年1月23日,上午1:58,dhanuka ranasinghe  写道:
> 
> patternStream



Re: Parallel CEP

2019-01-23 Thread Dian Fu
Hi Dhanuka,

In order to make the CEP operator to run parallel, the input stream should be 
KeyedStream. You can refer [1] for detailed information.

Regards,
Dian

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#detecting-patterns

> 在 2019年1月24日,上午10:18,dhanuka ranasinghe  写道:
> 
> Hi All,
> 
> Is there way to run CEP function parallel. Currently CEP run only sequentially
> 
> 
> .
> 
> Cheers,
> Dhanuka
> 
> -- 
> Nothing Impossible,Creativity is more important than knowledge.



Re: Flink CEP : Doesn't generate output

2019-01-23 Thread dhanuka ranasinghe
Thank you for the clarification.

On Thu, 24 Jan 2019, 12:44 Dian Fu  Hi Dhanuka,
>
> From the code you shared, it seems that you're using event time. The
> processing of elements is triggered by watermark in event time and so you
> should define how to generate the watermark, i.e with
> DataStream.assignTimestampsAndWatermarks
>
> Regards,
> Dian
>
>
> 在 2019年1月23日,上午1:58,dhanuka ranasinghe  写道:
>
> patternStream
>
>
>


Re: Parallel CEP

2019-01-23 Thread dhanuka ranasinghe
Hi Dian,

I tried that but then kafkaproducer only produce to single partition and
only single flink host working while rest not contribute for processing . I
will share the code and screenshot

Cheers
Dhanuka

On Thu, 24 Jan 2019, 12:31 Dian Fu  Hi Dhanuka,
>
> In order to make the CEP operator to run parallel, the input stream should
> be KeyedStream. You can refer [1] for detailed information.
>
> Regards,
> Dian
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#detecting-patterns
>
> > 在 2019年1月24日,上午10:18,dhanuka ranasinghe 
> 写道:
> >
> > Hi All,
> >
> > Is there way to run CEP function parallel. Currently CEP run only
> sequentially
> >
> > 
> > .
> >
> > Cheers,
> > Dhanuka
> >
> > --
> > Nothing Impossible,Creativity is more important than knowledge.
>
>


Re: When can the savepoint directory be deleted?

2019-01-23 Thread Ben Yan
hi

I got it. Thanks!

Best
Ben

Kien Truong  于2019年1月23日周三 下午10:31写道:

> Hi,
>
> As of Flink 1.7, the savepoint should not be deleted until after the
> first checkpoint has been successfully taken.
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/release-notes/flink-1.7.html#savepoints-being-used-for-recovery
>
>
> Regards,
>
> Kien
>
>
> On 1/23/2019 6:57 PM, Ben Yan wrote:
> > hi:
> >
> > Can I delete this savepoint directory immediately after the job
> > resumes running from the savepoint directory?
> >
> > Best
> > Ben
>


Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-23 Thread Aaron Levin
Hi Ufuk,

One more update: I tried copying all the hadoop native `.so` files (mainly
`libhadoop.so`) into `/lib` and am I still experiencing the issue I
reported. I also tried naively adding the `.so` files to the jar with the
flink application and am still experiencing the issue I reported (however,
I'm going to investigate this further as I might not have done it
correctly).

Best,

Aaron Levin

On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:

> Hi Ufuk,
>
> Two updates:
>
> 1. As suggested in the ticket, I naively copied the every `.so` in
> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
> knowledge of how shared libs get picked up is hazy, so I'm not sure if
> blindly copying them like that should work. I did check what
> `System.getProperty("java.library.path")` returns at the call-site and
> it's: 
> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
> 2. The exception I see comes from
> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
> This uses `System.loadLibrary("hadoop")`.
>
> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
> [2019-01-23 19:52:33.081376]  at
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
> [2019-01-23 19:52:33.081406]  at
> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
> [2019-01-23 19:52:33.081429]  at
> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
> [2019-01-23 19:52:33.081457]  at
> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
> [2019-01-23 19:52:33.081494]  at
> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
> [2019-01-23 19:52:33.081517]  at
> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
> [2019-01-23 19:52:33.081549]  at
> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
> ... (redacted) ...
> [2019-01-23 19:52:33.081728]  at
> scala.collection.immutable.List.foreach(List.scala:392)
> ... (redacted) ...
> [2019-01-23 19:52:33.081832]  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> [2019-01-23 19:52:33.081854]  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> [2019-01-23 19:52:33.081882]  at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> [2019-01-23 19:52:33.081904]  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> [2019-01-23 19:52:33.081946]  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)
>
> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin  wrote:
>
>> Hey Ufuk,
>>
>> So, I looked into this a little bit:
>>
>> 1. clarification: my issues are with the hadoop-related snappy libraries
>> and not libsnappy itself (this is my bad for not being clearer, sorry!). I
>> already have `libsnappy` on my classpath, but I am looking into including
>> the hadoop snappy libraries.
>> 2. exception: I don't see the class loading error. I'm going to try to
>> put some more instrumentation and see if I can get a clearer stacktrace
>> (right now I get an NPE on closing a sequence file in a finalizer - when I
>> last logged the exception it was something deep in hadoop's snappy libs -
>> I'll get clarification soon).
>> 3. I'm looking into including hadoop's snappy libs in my jar and we'll
>> see if that resolves the problem.
>>
>> Thanks again for your help!
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin 
>> wrote:
>>
>>> Hey,
>>>
>>> Thanks so much for the help! This is awesome. I'll start looking into
>>> all of this right away and report back.
>>>
>>> Best,
>>>
>>> Aaron Levin
>>>
>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:
>>>
 Hey Aaron,

 sorry for the late reply.

 (1) I think I was able to reproduce this issue using snappy-java. I've
 filed a ticket here:
 https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
 ticket description whether it's in line with what you are
 experiencing? Most importantly, do you see the same Exception being
 reported after cancelling and re-starting the job?

 (2) I don't think it's caused by the environment options not being
 picked up. You can check the head of the log files of the JobManager
 or TaskManager to verify that your provided option is picked up as
 expected. You should see something similar to this:

 2019-01-21 22:53:49,863 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

 
 2019-01-21 22:53:49,864 INFO
 

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-23 Thread Aaron Levin
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in
`hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
knowledge of how shared libs get picked up is hazy, so I'm not sure if
blindly copying them like that should work. I did check what
`System.getProperty("java.library.path")` returns at the call-site and
it's: 
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from
`hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at
org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at
org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at
org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at
org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at
org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at
scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin  wrote:

> Hey Ufuk,
>
> So, I looked into this a little bit:
>
> 1. clarification: my issues are with the hadoop-related snappy libraries
> and not libsnappy itself (this is my bad for not being clearer, sorry!). I
> already have `libsnappy` on my classpath, but I am looking into including
> the hadoop snappy libraries.
> 2. exception: I don't see the class loading error. I'm going to try to put
> some more instrumentation and see if I can get a clearer stacktrace (right
> now I get an NPE on closing a sequence file in a finalizer - when I last
> logged the exception it was something deep in hadoop's snappy libs - I'll
> get clarification soon).
> 3. I'm looking into including hadoop's snappy libs in my jar and we'll see
> if that resolves the problem.
>
> Thanks again for your help!
>
> Best,
>
> Aaron Levin
>
> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin 
> wrote:
>
>> Hey,
>>
>> Thanks so much for the help! This is awesome. I'll start looking into all
>> of this right away and report back.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:
>>
>>> Hey Aaron,
>>>
>>> sorry for the late reply.
>>>
>>> (1) I think I was able to reproduce this issue using snappy-java. I've
>>> filed a ticket here:
>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>>> ticket description whether it's in line with what you are
>>> experiencing? Most importantly, do you see the same Exception being
>>> reported after cancelling and re-starting the job?
>>>
>>> (2) I don't think it's caused by the environment options not being
>>> picked up. You can check the head of the log files of the JobManager
>>> or TaskManager to verify that your provided option is picked up as
>>> expected. You should see something similar to this:
>>>
>>> 2019-01-21 22:53:49,863 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>>
>>> 
>>> 2019-01-21 22:53:49,864 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>>> ...
>>> 2019-01-21 22:53:49,865 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
>>> Options:
>>> 2019-01-21 22:53:49,865 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> -Xms1024m
>>> 2019-01-21 22:53:49,865 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> -Xmx1024m
>>> You are looking for this line > 2019-01-21 22:53:49,865 INFO
>>> 

Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-23 Thread pwestermann
Thanks Gordon,

I get the same exception in the JM logs and that looks like it's causing the
job failure.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-23 Thread Tzu-Li (Gordon) Tai
Thanks for the logs.

Is the job restore actually failing? If yes, there should be an exception
for the exact cause of the failure.

Otherwise, the AvroSerializer warnings in the taskmanager logs is actually
expected behaviour when restoring from savepoint versions before 1.7.x, and
shouldn't cause job failures (unless something unexpected is happening).
Shortly put, to describe the cause of that warning:
Previously in 1.6.x, the AvroSerializer was Java-serialized into savepoints.
In 1.7.x, when restoring from previous version savepoints, that serializer
will still be attempted to be read using Java serialization (which explains
the InvalidClassException in the WARN log).
However, starting from 1.7 we no longer rely on serializers being written
directly into savepoints, so whether or not reading that serializer was
successful should not matter and the restore should proceed normally.

Please do let me know if the job is actually failing, then we should
investigate further. If it is failing, there should be an exception in the
JM logs identifying the cause of job failure.
CC'ing Igal, as he worked on the AvroSerializer for 1.7.x and might have
more info.

Cheers,
Gordon

On Wed, Jan 23, 2019 at 7:42 PM pwestermann 
wrote:

> There is not much in the log as this immediately happens when I start the
> job. I attached one of the taskmanager logs. The first error message I see
> is  /Could not read a requested serializer. Replaced with a
> UnloadableDummyTypeSerializer./ and the exception is
>
>
> taskmanager.log
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1547/taskmanager.log>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-23 Thread pwestermann
There is not much in the log as this immediately happens when I start the
job. I attached one of the taskmanager logs. The first error message I see
is  /Could not read a requested serializer. Replaced with a
UnloadableDummyTypeSerializer./ and the exception is


taskmanager.log

  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Thomas Weise
+1 for trimming the size by default and offering the fat distribution as
alternative download


On Wed, Jan 23, 2019 at 8:35 AM Till Rohrmann  wrote:

> Ufuk's proposal (having a lean default release and a user convenience
> tarball) sounds good to me. That way advanced users won't be bothered by an
> unnecessarily large release and new users can benefit from having many
> useful extensions bundled in one tarball.
>
> Cheers,
> Till
>
> On Wed, Jan 23, 2019 at 3:42 PM Ufuk Celebi  wrote:
>
> > On Wed, Jan 23, 2019 at 11:01 AM Timo Walther 
> wrote:
> > > I think what is more important than a big dist bundle is a helpful
> > > "Downloads" page where users can easily find available filesystems,
> > > connectors, metric repoters. Not everyone checks Maven central for
> > > available JAR files. I just saw that we added a "Optional components"
> > > section recently [1], we just need to make it more prominent. This is
> > > also done for the SQL connectors and formats [2].
> >
> > +1 I fully agree with the importance of the Downloads page. We
> > definitely need to make any optional dependencies that users need to
> > download easy to find.
> >
>


RE: [Flink 1.6] How to get current total number of processed events

2019-01-23 Thread Thanh-Nhan Vo
Hi Kien Truong,

Thank you for your answer. I have another question, please !
If I count the number of messages processed for a given key j (denoted c_j), is 
there a way to retrieve max{c_j}, min{c_j}?

Thanks

De : Kien Truong [mailto:duckientru...@gmail.com]
Envoyé : mercredi 23 janvier 2019 16:04
À : user@flink.apache.org
Objet : Re: [Flink 1.6] How to get current total number of processed events


Hi Nhan,

Logically, the total number of processed events before an event cannot be 
accurately calculated unless events processing are synchronized.

This is not scalable, so naturally I don't think Flink supports it.

Although, I suppose you can get an approximate count by using a non-keyed 
TumblingWindow, count the item inside the window, then use that value in the 
next window.



Regards,

Kien


On 1/21/2019 9:34 PM, Thanh-Nhan Vo wrote:
Hello all,

I have a question, please !
I'm using Flink 1.6 to process our data in streaming mode.
I wonder if at a given event, there is a way to get the current total number of 
processed events (before this event).
If possible, I want to get this total number of processed events as a value 
state in Keystream.
It means that for a given key in KeyStream, I want to retrieve not only the 
total number of processed events for this key but also the total number of 
processed events for all keys.

There is a way to do this in Flink 1.6, please!
Best regard,
Nhan



Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-23 Thread Tzu-Li (Gordon) Tai
Hi,

Thanks for reporting this.

Could you provide more details (error message, exception stack trace) that
you are getting?
This is unexpected, as the changes to flink-avro serializers in 1.7.x
should be backwards compatible.
More details on how the restore failed will be helpful here.

Cheers,
Gordon


On Wed, Jan 23, 2019 at 2:54 PM pwestermann 
wrote:

> I am trying to migrate from Flink 1.6.3 to 1.7.1 but am not able to restore
> the job from a savepoint taken in 1.6.3.
>
> We are using an AsyncFunction to publish Avro records to SQS. The state for
> the AsyncWaitOperator cannot be restored because of serializer changes in
> flink-avro from 1.6.3 to 1.7.1.
>
> Any idea how to avoid this problem? Maybe start the job with flink-avro
> 1.6.3 or will that break other parts?
>
> Thanks,
> Peter
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


getting duplicate messages from duplicate jobs

2019-01-23 Thread Avi Levi
Hi,
This quite confusing.
I submitted the same stateless job twice (actually I upload it once).
However when I place a message on kafka, it seems that both jobs consumes
it, and publish the same result (we publish the result to other kafka
topic, so I actually see the massage duplicated on kafka ). how can it be ?
both jobs are using the same group id (group id is fixed and not generated )

Kind regards
Avi


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Till Rohrmann
Ufuk's proposal (having a lean default release and a user convenience
tarball) sounds good to me. That way advanced users won't be bothered by an
unnecessarily large release and new users can benefit from having many
useful extensions bundled in one tarball.

Cheers,
Till

On Wed, Jan 23, 2019 at 3:42 PM Ufuk Celebi  wrote:

> On Wed, Jan 23, 2019 at 11:01 AM Timo Walther  wrote:
> > I think what is more important than a big dist bundle is a helpful
> > "Downloads" page where users can easily find available filesystems,
> > connectors, metric repoters. Not everyone checks Maven central for
> > available JAR files. I just saw that we added a "Optional components"
> > section recently [1], we just need to make it more prominent. This is
> > also done for the SQL connectors and formats [2].
>
> +1 I fully agree with the importance of the Downloads page. We
> definitely need to make any optional dependencies that users need to
> download easy to find.
>


Re: [Flink 1.6] How to get current total number of processed events

2019-01-23 Thread Kien Truong

Hi Nhan,

Logically, the total number of processed events before an event cannot 
be accurately calculated unless events processing are synchronized.


This is not scalable, so naturally I don't think Flink supports it.

Although, I suppose you can get an approximate count by using a 
non-keyed TumblingWindow, count the item inside the window, then use 
that value in the next window.



Regards,

Kien


On 1/21/2019 9:34 PM, Thanh-Nhan Vo wrote:


Hello all,

I have a question, please !
I’m using Flink 1.6 to process our data in streaming mode.
I wonder if at a given event, there is a way to get the current total 
number of processed events (before this event).


If possible, I want to get this total number of processed events as a 
value state in Keystream.
It means that for a given key in KeyStream, I want to retrieve not 
only the total number of processed events for this key but also the 
total number of processed events for all keys.


There is a way to do this in Flink 1.6, please!

Best regard,
Nhan



Re: How to trigger a Global Window with a different Message from the window message

2019-01-23 Thread Kien Truong

Hi Oliver,

Try replacing Global Window with a KeyedProcessFunction.

Store all the item received between CalcStart and CalcEnd inside a 
ListState the process them when CalcEnd is received.


Regards,

Kien


On 1/17/2019 1:06 AM, Oliver Buckley-Salmon wrote:


Hi,

I have a Flink job where I receive a stream of AggregationKeys, stored 
in BroadcastState which I join in a Tuple2 with a stream of 
RiskMeasureMessages, which I then wish to aggregate in a Window.


The RiskMeasureMessages are bounded by CalcStart and CalcEnd messages 
which come on separate Kafka topics, we can ignore CalcStart but need 
to emit the aggregated results after receiving CalcEnd. The CalcEnd 
messages are unkeyed.


My issue is how to get the CalcEnd message to the Trigger to use in 
the onElement() method, the only way I can see to do it is to create a 
co-stream but I don’t see how to Window and Trigger that, or merge the 
co-stream into Tuple3 wich would have Null for the 3^rd element all 
the time except when EndCalc is received.


Are there better ways for doing this?

I’m running Flink 1.7.1 using the Java API.

Thanks in advance for your help.

Kind regards,
Oliver Buckley-Salmon







---
This e-mail may contain confidential and/or privileged information. If 
you are not the intended recipient (or have received this e-mail in 
error) please notify the sender immediately and delete this e-mail. 
Any unauthorized copying, disclosure or distribution of the material 
in this e-mail is strictly forbidden.


Please refer to https://www.db.com/disclosures for additional EU 
corporate and regulatory disclosures and to 
http://www.db.com/unitedkingdom/content/privacy.htm for information 
about privacy.


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Ufuk Celebi
On Wed, Jan 23, 2019 at 11:01 AM Timo Walther  wrote:
> I think what is more important than a big dist bundle is a helpful
> "Downloads" page where users can easily find available filesystems,
> connectors, metric repoters. Not everyone checks Maven central for
> available JAR files. I just saw that we added a "Optional components"
> section recently [1], we just need to make it more prominent. This is
> also done for the SQL connectors and formats [2].

+1 I fully agree with the importance of the Downloads page. We
definitely need to make any optional dependencies that users need to
download easy to find.


Re: When can the savepoint directory be deleted?

2019-01-23 Thread Kien Truong

Hi,

As of Flink 1.7, the savepoint should not be deleted until after the 
first checkpoint has been successfully taken.



https://ci.apache.org/projects/flink/flink-docs-release-1.7/release-notes/flink-1.7.html#savepoints-being-used-for-recovery


Regards,

Kien


On 1/23/2019 6:57 PM, Ben Yan wrote:

hi:

Can I delete this savepoint directory immediately after the job 
resumes running from the savepoint directory?


Best
Ben


How test and validate a data stream software?

2019-01-23 Thread Alexandre Strapacao Guedes Vianna
Hello People,

I'm conducting a study for my PhD about applications using data stream
processing, and I would like to investigate de following questions:


   - How test and validate a data stream software?
   - Is there specific testing frameworks, tools, or testing environments?
   - What are the strategies for generating test data?


Please, may you help me explaining how you are testing and validating your
stream-based/oriented software?

Regards Alexandre


Trouble migrating state from 1.6.3 to 1.7.1

2019-01-23 Thread pwestermann
I am trying to migrate from Flink 1.6.3 to 1.7.1 but am not able to restore
the job from a savepoint taken in 1.6.3.

We are using an AsyncFunction to publish Avro records to SQS. The state for
the AsyncWaitOperator cannot be restored because of serializer changes in
flink-avro from 1.6.3 to 1.7.1.

Any idea how to avoid this problem? Maybe start the job with flink-avro
1.6.3 or will that break other parts?

Thanks,
Peter



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How Flink prioritise read from kafka topics and partitions ?

2019-01-23 Thread sohimankotia
Hi,

Let's say I have flink Kafka consumer read from 3 topics ,  [ T-1 ,T-2,T-3 ]
. 

- T1 and T2 are having partitions equal to 100 
- T3 is having partitions equal to 60
- Flink Task (parallelism is 50)

How flink will prioritize Kafka topic ?

If T-3 has more lag than other topics will flink give higher priority to T-3
?


Thanks and Regards
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


When can the savepoint directory be deleted?

2019-01-23 Thread Ben Yan
hi:

Can I delete this savepoint directory immediately after the job resumes
running from the savepoint directory?

Best
Ben


Re: Flink Jdbc streaming source support in 1.7.1 or in future?

2019-01-23 Thread Puneet Kinra
Then common way is to read in the cdc .writing generic operator wont be
easy .

On Wed, Jan 23, 2019 at 12:45 PM Manjusha Vuyyuru 
wrote:

> But 'JDBCInputFormat' will exit once its done reading all data.I need
> something like which keeps polling to mysql and fetch if there are any
> updates or changes.
>
> Thanks,
> manju
>
> On Wed, Jan 23, 2019 at 7:10 AM Zhenghua Gao  wrote:
>
>> Actually flink-connectors/flink-jdbc module provided a JDBCInputFormat to
>> read data from a database.
>> u can have a try.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: No resource available error while testing HA

2019-01-23 Thread Averell
Hi Gary,

Thanks for your support.

I use flink 1.7.0. I will try to test without that -n.
Here below are the JM log (on server .82) and TM log (on server .88). I'm
sorry that I missed that TM log before asking, had a thought that it would
not relevant. I just fixed the issue with connection to zookeeper and the
problem was solved.

Then I have another question: when JM cannot start/connect to the JM on .88,
why didn't it try on .82 where resource are still available? 

Thanks and regards,
Averell

Here is the JM log (from /mnt/var/log/hadoop-yarn/.../jobmanager.log on .82)
(it seems irrelevant. Even the earlier message regarding NoResourceAvailable
was there in GUI, but not found in the jobmanager.log file):

2019-01-23 04:15:01.869 [main] WARN 
org.apache.flink.configuration.Configuration  - Config uses deprecated
configuration key 'web.port' instead of proper key 'rest.port'
2019-01-23 04:15:03.483 [main] WARN 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Upload
directory
/tmp/flink-web-08279f45-0244-4c5c-bc9b-299ac59b4068/flink-web-upload does
not exist, or has been deleted externally. Previously uploaded files are no
longer available.

And here is the TM log:
2019-01-23 11:07:07.479 [main] ERROR
o.a.flink.shaded.curator.org.apache.curator.ConnectionState  - Connection
timed out for connection string (localhost:2181) and timeout (15000) /
elapsed (56538)
org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
KeeperErrorCode = ConnectionLoss
at
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
at
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
at
org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl$1.call(NamespaceImpl.java:90)
at
org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:83)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:594)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:158)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:32)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.reset(NodeCache.java:242)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:175)
at
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:154)
at
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.start(ZooKeeperLeaderRetrievalService.java:107)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.start(TaskExecutor.java:277)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:168)
at
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:332)
at
org.apache.flink.yarn.YarnTaskExecutorRunner.lambda$run$0(YarnTaskExecutorRunner.java:142)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:141)
at
org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:75)
2019-01-23 11:07:08.224 [main-SendThread(localhost:2181)] WARN 
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x0
for server null, unexpected error, closing socket connection and attempting
reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: No resource available error while testing HA

2019-01-23 Thread Gary Yao
Hi Averell,

What Flink version are you using? Can you attach the full logs from JM and
TMs? Since Flink 1.5, the -n parameter (number of taskmanagers) should be
omitted unless you are in legacy mode [1].

> As per that screenshot, it looks like there are 2 tasks manager still
> running (one on each host .88 and .81), which means the one on .88 has not
> been cleaned properly. If it is, then how to clean it?

The TMs should terminate if they cannot register at the JM [2].

> I wonder whether when the server with JobManager crashes, the whole job is
> restarted, or a new JobManager will try to connect to the running TMs to
> resume the job?

The whole job is restarted but any existing TM containers are reused.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#legacy
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-registration-timeout

On Wed, Jan 23, 2019 at 7:19 AM Averell  wrote:

> Hello everyone,
>
> I am testing High Availability of Flink on YARN on an AWS EMR cluster.
> My configuration is an EMR with one master-node and 3 core-nodes (each with
> 16 vCores). Zookeeper is running on all nodes.
> Yarn session was created with: flink-yarn-session -n 2 -s 8 -jm 1024m -tm
> 20g
> A job with parallelism of 16 was submitted.
>
> I tried to execute the test by terminating the core-node (using Linux "init
> 0") having the job-manager running on. The first few restarts worked well -
> a new job-manager was elected, and the job was resumed properly.
> However, after some restarts, the new job-manager could not retrieve its
> needed resource any more (only one TM on the node with IP .81 was shown in
> the Task Managers GUI).
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Flink.png>
>
>
> I kept getting the error message
>
> "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 108, slots allocated: 60".
>
> Here below is what shown in YARN Resource Manager.
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Yarn.png>
>
>
> As per that screenshot, it looks like there are 2 tasks manager still
> running (one on each host .88 and .81), which means the one on .88 has not
> been cleaned properly. If it is, then how to clean it?
>
> I wonder whether when the server with JobManager crashes, the whole job is
> restarted, or a new JobManager will try to connect to the running TMs to
> resume the job?
>
>
> Thanks and regards,
> Averell
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Timo Walther
+1 for Stephan's suggestion. For example, SQL connectors have never been 
part of the main distribution and nobody complained about this so far. I 
think what is more important than a big dist bundle is a helpful 
"Downloads" page where users can easily find available filesystems, 
connectors, metric repoters. Not everyone checks Maven central for 
available JAR files. I just saw that we added a "Optional components" 
section recently [1], we just need to make it more prominent. This is 
also done for the SQL connectors and formats [2].


[1] https://flink.apache.org/downloads.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#dependencies


Regards,
Timo


Am 23.01.19 um 10:07 schrieb Ufuk Celebi:

I like the idea of a leaner binary distribution. At the same time I
agree with Jamie that the current binary is quite convenient and
connection speeds should not be that big of a deal. Since the binary
distribution is one of the first entry points for users, I'd like to
keep it as user-friendly as possible.

What do you think about building a lean distribution by default and a
"full" distribution that still bundles all the optional dependencies
for releases? (If you don't think that's feasible I'm still +1 to only
go with the "lean dist" approach.)

– Ufuk

On Wed, Jan 23, 2019 at 9:36 AM Stephan Ewen  wrote:

There are some points where a leaner approach could help.
There are many libraries and connectors that are currently being adding to
Flink, which makes the "include all" approach not completely feasible in
long run:

   - Connectors: For a proper experience with the Shell/CLI (for example for
SQL) we need a lot of fat connector jars.
 These come often for multiple versions, which alone accounts for 100s
of MBs of connector jars.
   - The pre-bundled FileSystems are also on the verge of adding 100s of MBs
themselves.
   - The metric reporters are bit by bit growing as well.

The following could be a compromise:

The flink-dist would include
   - the core flink libraries (core, apis, runtime, etc.)
   - yarn / mesos  etc. adapters
   - examples (the examples should be a small set of self-contained programs
without additional dependencies)
   - default logging
   - default metric reporter (jmx)
   - shells (scala, sql)

The flink-dist would NOT include the following libs (and these would be
offered for individual download)
   - Hadoop libs
   - the pre-shaded file systems
   - the pre-packaged SQL connectors
   - additional metric reporters


On Tue, Jan 22, 2019 at 3:19 AM Jeff Zhang  wrote:


Thanks Chesnay for raising this discussion thread.  I think there are 3
major use scenarios for flink binary distribution.

1. Use it to set up standalone cluster
2. Use it to experience features of flink, such as via scala-shell,
sql-client
3. Downstream project use it to integrate with their system

I did a size estimation of flink dist folder, lib folder take around 100M
and opt folder take around 200M. Overall I agree to make a thin flink dist.
So the next problem is which components to drop. I check the opt folder,
and I think the filesystem components and metrics components could be moved
out. Because they are pluggable components and is only used in scenario 1 I
think (setting up standalone cluster). Other components like flink-table,
flink-ml, flnk-gellay, we should still keep them IMHO, because new user may
still use it to try the features of flink. For me, scala-shell is the first
option to try new features of flink.



Fabian Hueske  于2019年1月18日周五 下午7:34写道:


Hi Chesnay,

Thank you for the proposal.
I think this is a good idea.
We follow a similar approach already for Hadoop dependencies and
connectors (although in application space).

+1

Fabian

Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:


Hello,

the binary distribution that we release by now contains quite a lot of
optional components, including various filesystems, metric reporters and
libraries. Most users will only use a fraction of these, and as such
pretty much only increase the size of flink-dist.

With Flink growing more and more in scope I don't believe it to be
feasible to ship everything we have with every distribution, and instead
suggest more of a "pick-what-you-need" model, where flink-dist is rather
lean and additional components are downloaded separately and added by
the user.

This would primarily affect the /opt directory, but could also be
extended to cover flink-dist. For example, the yarn and mesos code could
be spliced out into separate jars that could be added to lib manually.

Let me know what you think.

Regards,

Chesnay



--
Best Regards

Jeff Zhang





Re: Flink Jdbc streaming source support in 1.7.1 or in future?

2019-01-23 Thread Fabian Hueske
I think this is very hard to build in a generic way.
The common approach here would be to get access to the changelog stream of
the table, writing it to a message queue / event log (like Kafka, Pulsar,
Kinesis, ...) and ingesting the changes from the event log into a Flink
application.

You can of course build a custom source that returns all added rows if you
have some meta data in the table, e.g., timestamps indicating when a row
was added.
However, this can easily become very complex, for example if you need to
handle deletes and updates.

Best, Fabian

Am Mi., 23. Jan. 2019 um 08:14 Uhr schrieb Manjusha Vuyyuru <
vmanjusha@gmail.com>:

> But 'JDBCInputFormat' will exit once its done reading all data.I need
> something like which keeps polling to mysql and fetch if there are any
> updates or changes.
>
> Thanks,
> manju
>
> On Wed, Jan 23, 2019 at 7:10 AM Zhenghua Gao  wrote:
>
>> Actually flink-connectors/flink-jdbc module provided a JDBCInputFormat to
>> read data from a database.
>> u can have a try.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Ufuk Celebi
I like the idea of a leaner binary distribution. At the same time I
agree with Jamie that the current binary is quite convenient and
connection speeds should not be that big of a deal. Since the binary
distribution is one of the first entry points for users, I'd like to
keep it as user-friendly as possible.

What do you think about building a lean distribution by default and a
"full" distribution that still bundles all the optional dependencies
for releases? (If you don't think that's feasible I'm still +1 to only
go with the "lean dist" approach.)

– Ufuk

On Wed, Jan 23, 2019 at 9:36 AM Stephan Ewen  wrote:
>
> There are some points where a leaner approach could help.
> There are many libraries and connectors that are currently being adding to
> Flink, which makes the "include all" approach not completely feasible in
> long run:
>
>   - Connectors: For a proper experience with the Shell/CLI (for example for
> SQL) we need a lot of fat connector jars.
> These come often for multiple versions, which alone accounts for 100s
> of MBs of connector jars.
>   - The pre-bundled FileSystems are also on the verge of adding 100s of MBs
> themselves.
>   - The metric reporters are bit by bit growing as well.
>
> The following could be a compromise:
>
> The flink-dist would include
>   - the core flink libraries (core, apis, runtime, etc.)
>   - yarn / mesos  etc. adapters
>   - examples (the examples should be a small set of self-contained programs
> without additional dependencies)
>   - default logging
>   - default metric reporter (jmx)
>   - shells (scala, sql)
>
> The flink-dist would NOT include the following libs (and these would be
> offered for individual download)
>   - Hadoop libs
>   - the pre-shaded file systems
>   - the pre-packaged SQL connectors
>   - additional metric reporters
>
>
> On Tue, Jan 22, 2019 at 3:19 AM Jeff Zhang  wrote:
>
> > Thanks Chesnay for raising this discussion thread.  I think there are 3
> > major use scenarios for flink binary distribution.
> >
> > 1. Use it to set up standalone cluster
> > 2. Use it to experience features of flink, such as via scala-shell,
> > sql-client
> > 3. Downstream project use it to integrate with their system
> >
> > I did a size estimation of flink dist folder, lib folder take around 100M
> > and opt folder take around 200M. Overall I agree to make a thin flink dist.
> > So the next problem is which components to drop. I check the opt folder,
> > and I think the filesystem components and metrics components could be moved
> > out. Because they are pluggable components and is only used in scenario 1 I
> > think (setting up standalone cluster). Other components like flink-table,
> > flink-ml, flnk-gellay, we should still keep them IMHO, because new user may
> > still use it to try the features of flink. For me, scala-shell is the first
> > option to try new features of flink.
> >
> >
> >
> > Fabian Hueske  于2019年1月18日周五 下午7:34写道:
> >
> >> Hi Chesnay,
> >>
> >> Thank you for the proposal.
> >> I think this is a good idea.
> >> We follow a similar approach already for Hadoop dependencies and
> >> connectors (although in application space).
> >>
> >> +1
> >>
> >> Fabian
> >>
> >> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
> >> ches...@apache.org>:
> >>
> >>> Hello,
> >>>
> >>> the binary distribution that we release by now contains quite a lot of
> >>> optional components, including various filesystems, metric reporters and
> >>> libraries. Most users will only use a fraction of these, and as such
> >>> pretty much only increase the size of flink-dist.
> >>>
> >>> With Flink growing more and more in scope I don't believe it to be
> >>> feasible to ship everything we have with every distribution, and instead
> >>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
> >>> lean and additional components are downloaded separately and added by
> >>> the user.
> >>>
> >>> This would primarily affect the /opt directory, but could also be
> >>> extended to cover flink-dist. For example, the yarn and mesos code could
> >>> be spliced out into separate jars that could be added to lib manually.
> >>>
> >>> Let me know what you think.
> >>>
> >>> Regards,
> >>>
> >>> Chesnay
> >>>
> >>>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >


Re: GlobalWindow with custom tigger doesn't work correctly

2019-01-23 Thread David Anderson
Windowing and triggering on a keyed stream is done independently for each
key. So for each key, your custom trigger is observing when the lunumState
changes from null to a production cycle number, but it will never change
again -- because only those stream elements with the same key will be
processed in the context of that item of partitioned state.

One advantage of windowing on keyed streams is the parallelism that's made
possible by partitioning by key -- but in your case there's probably little
to be gained, assuming the production cycles are sequential, rather than
overlapping. You could proceed by (1) not keying the stream, (2) adapting
ImaginePaperWindowReportFunction to only process events for the cycle that
just ended (if necessary), and (3) writing a custom evictor to remove
events once they've been reported on.

On Tue, Jan 22, 2019 at 7:52 PM Daniel Krenn 
wrote:

> Hello people!
>
> I have a DataStream, which has events with with a continuing number which
> signifies their belonging to a production cycle. In essence, this is what
> the data looks like:
>
> value, production cycle
> 12.0, 2000
> 12.3, 2000 one production cylce
> 12.2, 2000
>
> 0.0, 2001
> 0.4, 2002 another production cycle
> 1.1, 2002
>
> 55.0, 2003
> 60.0, 2003 another production cycle
> 70.0, 2003
>
> I have to do some calculations over the events of each production cycle. I
> want to use Flink's window API for that. This is how I'm doing it right now:
>
> DataStream test = streamExecEnv.readTextFile(
> "C:/Projects/Python/testdata.txt")
> .map(new ImaginePaperDataConverterTest()) // convert data to POJO
> .assignTimestampsAndWatermarks(new ImaginePaperAssigner()) // Assign
> timestamps for event time
> .keyBy((ImaginePaperData event) -> event.lunum) //<- the production
> cycle number
> .window(GlobalWindows.create()) // create global window
> .trigger(new LunumTrigger()) // "split" the window with a custom
> trigger
> .process(new ImaginePaperWindowReportFunction()); // apply a function
> over the aggregated events
>
> I'm getting a "DataStream" out of a text file, just for testing purposes.
> The problem is that what I'm doing only aggregates one single event for a
> production cycle. Why is that? I thought keying the stream by the
> production cycle number already partitions the stream anyways. The trigger
> says when the production cycle number is changed, a new global window is
> started and the events of the current window are aggregated. What am I
> missing here?
> Just to be safe, here is my implementation of the custom trigger:
>
> public class LunumTrigger extends Trigger
> {
>
> private static final long serialVersionUID = 1L;
>
> public LunumTrigger() {}
>
> private final ValueStateDescriptor prevLunum = new
> ValueStateDescriptor<>("lunum", Integer.class);
>
> @Override
> public TriggerResult onElement(ImaginePaperData element, long timestamp,
> GlobalWindow window, TriggerContext ctx) throws Exception {
>
> ValueState lunumState = ctx.getPartitionedState(prevLunum);
>
> if (lunumState.value() == null || !(element.lunum.equals(lunumState.value(
> {
> System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: "
> + element.lunum + " ==> FIRE!");
> lunumState.update(element.lunum);
> return TriggerResult.FIRE_AND_PURGE;
> }
>
> System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: "
> + element.lunum + " ==> HOLD!");
> lunumState.update(element.lunum);
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, GlobalWindow window,
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onEventTime(long time, GlobalWindow window,
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public void clear(GlobalWindow window, TriggerContext ctx) throws
> Exception {
> ctx.getPartitionedState(prevLunum).clear();
> }
> }
>
> I'm very grateful for your help.
>
> Regards,
>
> Daniel
>
>

-- 
*David Anderson* | Training Coordinator
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-23 Thread Stephan Ewen
There are some points where a leaner approach could help.
There are many libraries and connectors that are currently being adding to
Flink, which makes the "include all" approach not completely feasible in
long run:

  - Connectors: For a proper experience with the Shell/CLI (for example for
SQL) we need a lot of fat connector jars.
These come often for multiple versions, which alone accounts for 100s
of MBs of connector jars.
  - The pre-bundled FileSystems are also on the verge of adding 100s of MBs
themselves.
  - The metric reporters are bit by bit growing as well.

The following could be a compromise:

The flink-dist would include
  - the core flink libraries (core, apis, runtime, etc.)
  - yarn / mesos  etc. adapters
  - examples (the examples should be a small set of self-contained programs
without additional dependencies)
  - default logging
  - default metric reporter (jmx)
  - shells (scala, sql)

The flink-dist would NOT include the following libs (and these would be
offered for individual download)
  - Hadoop libs
  - the pre-shaded file systems
  - the pre-packaged SQL connectors
  - additional metric reporters


On Tue, Jan 22, 2019 at 3:19 AM Jeff Zhang  wrote:

> Thanks Chesnay for raising this discussion thread.  I think there are 3
> major use scenarios for flink binary distribution.
>
> 1. Use it to set up standalone cluster
> 2. Use it to experience features of flink, such as via scala-shell,
> sql-client
> 3. Downstream project use it to integrate with their system
>
> I did a size estimation of flink dist folder, lib folder take around 100M
> and opt folder take around 200M. Overall I agree to make a thin flink dist.
> So the next problem is which components to drop. I check the opt folder,
> and I think the filesystem components and metrics components could be moved
> out. Because they are pluggable components and is only used in scenario 1 I
> think (setting up standalone cluster). Other components like flink-table,
> flink-ml, flnk-gellay, we should still keep them IMHO, because new user may
> still use it to try the features of flink. For me, scala-shell is the first
> option to try new features of flink.
>
>
>
> Fabian Hueske  于2019年1月18日周五 下午7:34写道:
>
>> Hi Chesnay,
>>
>> Thank you for the proposal.
>> I think this is a good idea.
>> We follow a similar approach already for Hadoop dependencies and
>> connectors (although in application space).
>>
>> +1
>>
>> Fabian
>>
>> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
>> ches...@apache.org>:
>>
>>> Hello,
>>>
>>> the binary distribution that we release by now contains quite a lot of
>>> optional components, including various filesystems, metric reporters and
>>> libraries. Most users will only use a fraction of these, and as such
>>> pretty much only increase the size of flink-dist.
>>>
>>> With Flink growing more and more in scope I don't believe it to be
>>> feasible to ship everything we have with every distribution, and instead
>>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
>>> lean and additional components are downloaded separately and added by
>>> the user.
>>>
>>> This would primarily affect the /opt directory, but could also be
>>> extended to cover flink-dist. For example, the yarn and mesos code could
>>> be spliced out into separate jars that could be added to lib manually.
>>>
>>> Let me know what you think.
>>>
>>> Regards,
>>>
>>> Chesnay
>>>
>>>
>
> --
> Best Regards
>
> Jeff Zhang
>