Re: JobManager shows TaskManager was lost/killed while TaskManger Process is still running and the network is OK.

2017-11-09 Thread Rahul Raj
HI All,

Even I am facing the same issue. My code fails after running for 15 hours
throwing same "Task Manager lost/killed exception". Can we please know the
possible solution in detail for this?

Rahul Raj

On 15 September 2017 at 23:06, AndreaKinn  wrote:

> Hi, sorry for re-vive this old conversation.
> I have exactly the same problem, can you provide more details about your
> solution?
> Have you used another garbage collector as G1? How can I set it?
>
> I've seen on configuration guideline I have to set the option:
> env.java.opts
> but I don't know which is the value to insert to set G1.
>
>
> Renkai wrote
> > The zookeeper related logs are loged by user codes,I finally find the
> > reason why the taskmanger was lost,that was I gave the taskmanager a big
> > amount of memory, the jobmanager identify the taskmanager is down during
> > the taskmanager in Full GC.Thanks for your help.
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Testing / Configuring event windows with Table API and SQL

2017-11-09 Thread Colin Williams
Hello,

I've been given some flink application code and asked to implement and
ensure that our query is updated for late arriving entries. We're currently
creating a table using a Tumbling SQL query similar to the first example in


https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#group-windows

We then turn the result table back into a datastream using toAppendStream,
and eventually add a derivative stream to a sink. We've configured
TimeCharacteristic to event-time processing.

>From reading the documentation I was trying to configure using
withIdleStateRetentionTime, with the expectation that this setting would
allow me to deal with late arrivals past a given watermark time, but within
the retention time.

Then to test this I created a simple source which triggers the watermark,
so that I'd have next a late arrival. However so far the watermark seems to
cause something to discriminate the late arrival. Then in my test sink
where I'm trying to capture all emitted outputs, and hopefully the updated
value I don't find one.

So it seems that my understanding of how to deal with late events, or my
test platform is wrong. Can anyone recognize what I'm doing wrong?


Best,

Colin Williams


Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-09 Thread Vergilio, Thalita
Hi Till,


I have made some progress with the name resolution for machines that are not in 
the same subnet. The problem I am facing now is Flink-specific, so I wonder if 
you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and 
the taskmanager in the Google cloud. However, when I scale the taskmanager up 
and it start running on Azure nodes as well, I get an Akka error which I 
presume means the taskmanagers can't talk to each other when parallelising the 
task.


Do you know what the IP address and port below are? Are they assigned by Flink?


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> 
Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> 
ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) 
(b9f31626fb7d83d39e24e570e034f03e) - TaskManager 
(3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding 
after a timeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
at 
org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
at 
org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
at akka.dispatch.Recover.internal(Future.scala:268)
at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] after 
[1 ms]
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at 
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
... 1 more



From: Till Rohrmann 
Sent: 06 November 2017 13:48:59
To: Vergilio, Thalita
Cc: Piotr Nowojski; user@flink.apache.org; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes

I'm not entirely sure how docker swarm works but from the Flink perspective 
there mustn't be two TaskManagers running on the same host (meaning an entity 
where you share the same address) if you set the TaskManager data port to a 
fixed value (otherwise only one of them can be started due to port conflicts). 
If you can ensure that this is the case, then it should be save to specify a 
port for the data transmission.

Cheers,
Till

On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita 
mailto:t.vergilio4...@student.leedsbeckett.ac.uk>>
 wrote:

Hi Till,


Thanks a lot for your answer.


Is the taskmanager.data.port unique per TaskManager? The documentation says it 
is assigned at runtime by the OS. My thinking here is that you would need to 
know what that is at service creation time, which would go against the whole 
idea of how services are scaled in Docker Swarm.


When you create a Swarm service using 'docker stack deploy' or 'docker service 
create', the configuration that is used at that point is the same that will be 
used by all instances of the service. If you then scale TaskManager to 8 or 10 
containers, each of them gets the same service configuration(the one used to 
create the service).


I have in fact tried to map specific ports in the TaskManager service 
configuration, 

Re: Generate watermarks per key in a KeyedStream

2017-11-09 Thread Derek VerLee

  
  
We are contending with the same issue, as it happens.  We have
  dozens, and potentially down the line, may need to deal with
  thousands of different "time systems" as you put it, and may not
  be know at compile time or job start time.  In a practical sense,
  how could such a system be composed?  


On 11/9/17 5:52 AM, Shailesh Jain
  wrote:


  
Thanks for your reply, Xingcan.


  

  On Wed, Nov 8, 2017 at 10:42 PM,
Xingcan Cui 
wrote:

  
Hi
  Shailesh,


actually,
  the watermarks are generated per partition, but
  all of them will be forcibly aligned to the
  minimum one during processing. That is decided by
  the semantics of watermark and KeyedStream, i.e.,
  the watermarks belong to a whole stream and a
  stream is made up of different partitions (one per
  key).


If
  the physical devices work in different time
  systems due to delay, the event streams from them
  should be treated separately.


Hope
  that helps.


Best,
Xingcan
  
  

  
On Wed, Nov 8, 2017 at
  11:48 PM, Shailesh Jain 
  wrote:
  

  

  
Hi,
  

I'm working on implementing a use
case wherein different physical
devices are sending events, and due
to network/power issues, there can
be a delay in receiving events at
Flink source. One of the operators
within the flink job is the Pattern
operator, and there are certain
patterns which are time sensitive,
so I'm using Event time
characteristic. But the problem
comes when there are unpredictable
delays in events from a particular
device(s), which causes those events
to be dropped (as I cannot really
define a static bound to allow for
lateness).

  
  Since I'm using a KeyedStream, keyed
  on the source device ID, is there a
  way to allow each CEP operator
  instance (one per key) to progress its
  time based on the event time in the
  corresponding stream partition. Or in
  other words, is there a way to
  generate watermarks per partition in a
  KeyedStream?
  

Thanks,
  
  Shailesh

  


  

  

  
  

  

  


  



Re: Do timestamps and watermarks exist after window evaluation?

2017-11-09 Thread Derek VerLee

  
  
This new documentation seems to answer my question directly. 
  It's good to know my intuitions where not wildly off.  Also thank
  you for continuing to improve the already good documentation.
Funny enough, some of the other questions I have, where also
  asked by other users in the last couple days, so I'll just reply
  on those threads if necessary.
Thanks!


On 11/9/17 9:32 AM, Aljoscha Krettek
  wrote:


  
  Hi,
  
  
  This new section in the windowing documentation will
help answer your question: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#working-with-window-results
  
  
  Please let me know if you have any further
questions. :-)
  
  
  Best,
  Aljoscha

  
On 8. Nov 2017, at 18:54, Derek VerLee 
  wrote:


  When composing ("chaining") multiple
windowing operations on the same
stream are watermarks transmitted down stream after
window evaluation,
and are the records emitted from WindowFunctions given
timestamps? Do I
need to or should I always assignTimestampsAndWatermarks
to the
outputsof window evaluations if I want to window again?
If automatically
assigned, how should I think about them in an event time
context? Would
the event time of a record resulting from a
WindowFunction be the
window's end time in the case of a TimeWindow?

  

  


  


  



Re: Correlation between data streams/operators and threads

2017-11-09 Thread Shailesh Jain
On 1. - is it tied specifically to the number of source operators or to the
number of Datastream objects created. I mean does the answer change if I
read all the data from a single Kafka topic, get a Datastream of all
events, and the apply N filters to create N individual streams?

On 3. - the problem with partitions is that watermarks cannot be different
per partition, and since in this use case, each stream is from a device,
the latency could be different (but order will be correct almost always)
and there are high chances of loosing out on events on operators like
Patterns which work with windows. Any ideas for workarounds here?


Thanks,
Shailesh

On 09-Nov-2017 8:48 PM, "Piotr Nowojski"  wrote:

Hi,

1.
https://ci.apache.org/projects/flink/flink-docs-
release-1.3/dev/parallel.html

Number of threads executing would be roughly speaking equal to of the
number of input data streams multiplied by the parallelism.

2.
Yes, you could dynamically create more data streams at the job startup.

3.
Running 1 independent data streams on a small cluster (couple of nodes)
will definitely be an issue, since even with parallelism set to 1, there
would be quite a lot of unnecessary threads.

It would be much better to treat your data as a single data input stream
with multiple partitions. You could assign partitions between source
instances based on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
…
- source 5, could get partitions 5, 11, ...

Piotrek

On 9 Nov 2017, at 10:18, Shailesh Jain  wrote:

Hi,

I'm trying to understand the runtime aspect of Flink when dealing with
multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream
representing 1 device - with different time latencies), and each of these
data streams gets split into two streams, of which one goes into a bunch of
CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one
thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the
job starts? (i.e. if N is read from a file when the job starts and
corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of
streams (N ~ 1) are created, as opposed to N partitions within a single
stream?

Are there any internal (design) documents which can help understanding the
implementation details? Any references to the source will also be really
helpful.

Thanks in advance.

Shailesh


Re: Unsubscribe

2017-11-09 Thread Gary Yao
Hi Paolo,

If you haven't done so already, you need to write to

user-unsubscr...@flink.apache.org

to unsubscribe.

Best,

Gary

On Thu, Nov 9, 2017 at 5:28 PM, Paolo Cristofanelli <
cristofanelli.pa...@gmail.com> wrote:

> Hi,
> I would like to unsubscribe from the mailing list.
>
> Best,
> Paolo
>


Re: Flink memory leak

2017-11-09 Thread Piotr Nowojski
Hi,

Could you attach full logs from those task managers? At first glance I don’t 
see a connection between those exceptions and any memory issue that you might 
had. It looks like a dependency issue in one (some? All?) of your jobs.

Did you build your jars with -Pbuild-jar profile as described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
 

? 

If that doesn’t help. Can you binary search which job is causing the problem? 
There might be some Flink incompatibility between different versions and 
rebuilding a job’s jar with a version matching to the cluster version might 
help.

Piotrek


> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>  wrote:
> 
> On 2017-11-08 18:30, Piotr Nowojski wrote:
>> Btw, Ebru:
>> I don’t agree that the main suspect is NetworkBufferPool. On your
>> screenshots it’s memory consumption was reasonable and stable: 596MB
>> -> 602MB -> 597MB.
>> PoolThreadCache memory usage ~120MB is also reasonable.
>> Do you experience any problems, like Out Of Memory errors/crashes/long
>> GC pauses? Or just JVM process is using more memory over time? You are
>> aware that JVM doesn’t like to release memory back to OS once it was
>> used? So increasing memory usage until hitting some limit (for example
>> JVM max heap size) is expected behaviour.
>> Piotrek
>>> On 8 Nov 2017, at 15:48, Piotr Nowojski 
>>> wrote:
>>> I don’t know if this is relevant to this issue, but I was
>>> constantly getting failures trying to reproduce this leak using your
>>> Job, because you were using non deterministic getKey function:
>>> @Override
>>> public Integer getKey(Integer event) {
>>> Random randomGen = new Random((new Date()).getTime());
>>> return randomGen.nextInt() % 8;
>>> }
>>> And quoting Java doc of KeySelector:
>>> "If invoked multiple times on the same object, the returned key must
>>> be the same.”
>>> I’m trying to reproduce this issue with following job:
>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>> Where IntegerSource is just an infinite source, DisardingSink is
>>> well just discarding incoming data. I’m cancelling the job every 5
>>> seconds and so far (after ~15 minutes) my memory consumption is
>>> stable, well below maximum java heap size.
>>> Piotrek
 On 8 Nov 2017, at 15:28, Javier Lopez 
 wrote:
 Yes, I tested with just printing the stream. But it could take a
 lot of time to fail.
 On Wednesday, 8 November 2017, Piotr Nowojski
  wrote:
> Thanks for quick answer.
> So it will also fail after some time with `fromElements` source
 instead of Kafka, right?
> Did you try it also without a Kafka producer?
> Piotrek
> On 8 Nov 2017, at 14:57, Javier Lopez 
 wrote:
> Hi,
> You don't need data. With data it will die faster. I tested as
 well with a small data set, using the fromElements source, but it
 will take some time to die. It's better with some data.
> On 8 November 2017 at 14:54, Piotr Nowojski
  wrote:
>> Hi,
>> Thanks for sharing this job.
>> Do I need to feed some data to the Kafka to reproduce this
 issue with your script?
>> Does this OOM issue also happen when you are not using the
 Kafka source/sink?
>> Piotrek
>> On 8 Nov 2017, at 14:08, Javier Lopez 
 wrote:
>> Hi,
>> This is the test flink job we created to trigger this leak
 https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>> And this is the python script we are using to execute the job
 thousands of times to get the OOM problem
 https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>> The cluster we used for this has this configuration:
>> Instance type: t2.large
>> Number of workers: 2
>> HeapMemory: 5500
>> Number of task slots per node: 4
>> TaskMangMemFraction: 0.5
>> NumberOfNetworkBuffers: 2000
>> We have tried several things, increasing the heap, reducing the
 heap, more memory fraction, changes this value in the
 taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
 work.
>> Thanks for your help.
>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
  wrote:
>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
 Hi Ebru and Javier,
 Yes, if you could share this example job it would be helpful.
 Ebru: could you explain in a little more details how does
 your Job(s)
 look like? Could you post some code? If you are just using
 maps and
 filters there shouldn’t be any network transfers involved,
 aside
 from Source and Sink functions.
 Piotrek
> On 8 Nov 2017, at 12:54, ebru
  wrote:
> Hi Javier,
> It would be helpful if you share your test job with us.
> Which configurat

Streaming : a way to "key by partition id" without redispatching data

2017-11-09 Thread Gwenhael Pasquiers
Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


Re: Flink memory leak

2017-11-09 Thread ÇETİNKAYA EBRU ÇETİNKAYA EBRU

On 2017-11-08 18:30, Piotr Nowojski wrote:

Btw, Ebru:

I don’t agree that the main suspect is NetworkBufferPool. On your
screenshots it’s memory consumption was reasonable and stable: 596MB
-> 602MB -> 597MB.

PoolThreadCache memory usage ~120MB is also reasonable.

Do you experience any problems, like Out Of Memory errors/crashes/long
GC pauses? Or just JVM process is using more memory over time? You are
aware that JVM doesn’t like to release memory back to OS once it was
used? So increasing memory usage until hitting some limit (for example
JVM max heap size) is expected behaviour.

Piotrek


On 8 Nov 2017, at 15:48, Piotr Nowojski 
wrote:

I don’t know if this is relevant to this issue, but I was
constantly getting failures trying to reproduce this leak using your
Job, because you were using non deterministic getKey function:

@Override
public Integer getKey(Integer event) {
Random randomGen = new Random((new Date()).getTime());
return randomGen.nextInt() % 8;
}

And quoting Java doc of KeySelector:

"If invoked multiple times on the same object, the returned key must
be the same.”

I’m trying to reproduce this issue with following job:

https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3

Where IntegerSource is just an infinite source, DisardingSink is
well just discarding incoming data. I’m cancelling the job every 5
seconds and so far (after ~15 minutes) my memory consumption is
stable, well below maximum java heap size.

Piotrek


On 8 Nov 2017, at 15:28, Javier Lopez 
wrote:
Yes, I tested with just printing the stream. But it could take a
lot of time to fail.

On Wednesday, 8 November 2017, Piotr Nowojski
 wrote:

Thanks for quick answer.
So it will also fail after some time with `fromElements` source

instead of Kafka, right?

Did you try it also without a Kafka producer?
Piotrek

On 8 Nov 2017, at 14:57, Javier Lopez 

wrote:

Hi,
You don't need data. With data it will die faster. I tested as

well with a small data set, using the fromElements source, but it
will take some time to die. It's better with some data.

On 8 November 2017 at 14:54, Piotr Nowojski

 wrote:


Hi,
Thanks for sharing this job.
Do I need to feed some data to the Kafka to reproduce this

issue with your script?

Does this OOM issue also happen when you are not using the

Kafka source/sink?

Piotrek

On 8 Nov 2017, at 14:08, Javier Lopez 

wrote:

Hi,
This is the test flink job we created to trigger this leak

https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6

And this is the python script we are using to execute the job

thousands of times to get the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107

The cluster we used for this has this configuration:

Instance type: t2.large
Number of workers: 2
HeapMemory: 5500
Number of task slots per node: 4
TaskMangMemFraction: 0.5
NumberOfNetworkBuffers: 2000

We have tried several things, increasing the heap, reducing the

heap, more memory fraction, changes this value in the
taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
work.

Thanks for your help.
On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU

 wrote:


On 2017-11-08 15:20, Piotr Nowojski wrote:


Hi Ebru and Javier,

Yes, if you could share this example job it would be helpful.

Ebru: could you explain in a little more details how does

your Job(s)

look like? Could you post some code? If you are just using

maps and

filters there shouldn’t be any network transfers involved,

aside

from Source and Sink functions.

Piotrek


On 8 Nov 2017, at 12:54, ebru

 wrote:


Hi Javier,

It would be helpful if you share your test job with us.
Which configurations did you try?

-Ebru

On 8 Nov 2017, at 14:43, Javier Lopez



wrote:

Hi,

We have been facing a similar problem. We have tried some

different

configurations, as proposed in other email thread by Flavio

and

Kien, but it didn't work. We have a workaround similar to

the one

that Flavio has, we restart the taskmanagers once they reach

a

memory threshold. We created a small test to remove all of

our

dependencies and leave only flink native libraries. This

test reads

data from a Kafka topic and writes it back to another topic

in

Kafka. We cancel the job and start another every 5 seconds.

After

~30 minutes of doing this process, the cluster reaches the

OS memory

limit and dies.

Currently, we have a test cluster with 8 workers and 8 task

slots

per node. We have one job that uses 56 slots, and we cannot

execute

that job 5 times in a row because the whole cluster dies. If

you

want, we can publish our test job.

Regards,

On 8 November 2017 at 11:20, Aljoscha Krettek



wrote:

@Nico & @Piotr Could you please have a look at this? You

both

recently worked on the network stack and might be most

familiar with

this.

On 8. Nov 2017, at 10:25, Flavio Pompermaier



wrote:

We also have the same problem in production. At the moment

the

solution is to restart the entire Flink 

Unsubscribe

2017-11-09 Thread Paolo Cristofanelli
Hi,
I would like to unsubscribe from the mailing list.

Best,
Paolo


Re: How to best create a bounded session window ?

2017-11-09 Thread Piotr Nowojski
Indeed you are unfortunately right. Triggers do not define/control lifecycle of 
the window, so it could happen that each new event is constantly pushing the 
leading boundary of the window, while your custom trigger is constantly 
triggering and purging this single EVENT (because exceeded max window length). 
So probably my example of events:
1 2 3 4 6 7 8 5
Would result in following fired windows:
[1 2 3 4 6] (window time from 1 to 6) +
[7] - (window time from 1 to 7) +
[8] - (window time from 1 to 8) +
[5] - (window time from 1 to 8)

I’m not sure if you can walk around this issue. You would have to either 
implement your custom WindowOperator that behaves differently or you could copy 
the code and add new TriggerResult - FIRE_PURGE_AND_DROP_WINDOW. The later one 
maybe could be contributed back into Flink (should be discussed in some ticket 
before).

Piotrek

> On 9 Nov 2017, at 15:27, Vishal Santoshi  wrote:
> 
> Thanks you for the response. 
> 
> I would not mind the second scenario as in a second window, which 
> your illustration suggests with a custom trigger approach, I am not certain  
> though that triggers  define the lifecycle of a window, as in a trigger 
> firing does not necessarily imply a Garbage Collectable Window.  It should be 
> GCed only after the watermark exceeds a hypothetically ever increasing window 
> leading boundary by a lag. In a some case that might never happen as in the 
> leading boundary is forever increasing. We may decide to fire_and_purge. fire 
> etc but the window remains live.  Or did I get that part wrong ? 
> 
> 
> Vishal.
> 
> 
> 
> 
> On Thu, Nov 9, 2017 at 8:24 AM, Piotr Nowojski  > wrote:
> It might be more complicated if you want to take into account events coming 
> in out of order. For example you limit length of window to 5 and you get the 
> following events:
> 
> 1 2 3 4 6 7 8 5
> 
> Do you want to emit windows:
> 
> [1 2 3 4 5] (length limit exceeded) + [6 7 8] ?
> 
> Or are you fine with interleaving windows in case of out of order:
> 
> [1 2 3 4 6] + [5 7 8] 
> 
> If the latter one, some custom Trigger should be enough for you. If not, you 
> would need to implement hypothetical MergingAndSplitableWindowAssigner, that 
> after encountering late event “5” could split previously created windows. 
> Unfortunately such feature is not supported by a WindowOperator, so you would 
> have to implement your own operator for this.
> 
> Regardless of your option remember to write some integration tests:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing
>  
> 
> 
> Piotrek
> 
>> On 8 Nov 2017, at 21:43, Vishal Santoshi > > wrote:
>> 
>> I am implementing a bounded session window but I require to short circuit 
>> the session if the session length ( in count of events or time ) go beyond a 
>> configured limit , a very reasonable scenario ( bot etc ) . I am using the 
>> approach as listed. I am not sure though if the Window itself is being 
>> terminated and if that is even feasible. Any other approach or advise ?  
>> 
>> public class BoundedEventTimeTrigger extends Trigger {
>> private static final long serialVersionUID = 1L;
>> long maxSessionTime;
>> 
>> ValueState doneState;
>> private final ValueStateDescriptor cleanupStateDescriptor =
>> new ValueStateDescriptor<>("done", Boolean.class );
>> 
>> private BoundedEventTimeTrigger(long maxSessionTime) {
>> this.maxSessionTime = maxSessionTime;
>> }
>> 
>> /**
>>  * Creates an event-time trigger that fires once the watermark passes 
>> the end of the window.
>>  * 
>>  * Once the trigger fires all elements are discarded. Elements that 
>> arrive late immediately
>>  * trigger window evaluation with just this one element.
>>  */
>> public static BoundedEventTimeTrigger create(long maxSessionLengh) {
>> return new BoundedEventTimeTrigger(maxSessionLengh);
>> }
>> 
>> @Override
>> public TriggerResult onElement(Object element, long timestamp, 
>> TimeWindow window, TriggerContext ctx) throws Exception {
>> if(cleanupState!=null && cleanupState.value()!=null && 
>> cleanupState.value()) {
>> return TriggerResult.CONTINUE;
>> }
>> if(timestamp - window.getStart() > maxSessionTime){
>> System.out.println(new Date(timestamp) + "\t" + new 
>> Date(window.getStart()));
>> try {
>> doneState = ctx.getPartitionedState(cleanupStateDescriptor);
>> doneState.update(true);
>> return TriggerResult.FIRE_AND_PURGE;
>> } catch (IOException e) {
>> throw new RuntimeException("Failed to update state", e);
>> }
>> }
>> 
>> if (window.maxTimest

Re: Correlation between data streams/operators and threads

2017-11-09 Thread Piotr Nowojski
Hi,

1. 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html 


Number of threads executing would be roughly speaking equal to of the number of 
input data streams multiplied by the parallelism.

2. 
Yes, you could dynamically create more data streams at the job startup.

3.
Running 1 independent data streams on a small cluster (couple of nodes) 
will definitely be an issue, since even with parallelism set to 1, there would 
be quite a lot of unnecessary threads. 

It would be much better to treat your data as a single data input stream with 
multiple partitions. You could assign partitions between source instances based 
on parallelism. For example with parallelism 6:
- source 0 could get partitions 0, 6, 12, 18
- source 1, could get partitions 1, 7, …
…
- source 5, could get partitions 5, 11, ...

Piotrek

> On 9 Nov 2017, at 10:18, Shailesh Jain  wrote:
> 
> Hi,
> 
> I'm trying to understand the runtime aspect of Flink when dealing with 
> multiple data streams and multiple operators per data stream.
> 
> Use case: N data streams in a single flink job (each data stream representing 
> 1 device - with different time latencies), and each of these data streams 
> gets split into two streams, of which one goes into a bunch of CEP operators, 
> and one into a process function.
> 
> Questions:
> 1. At runtime, will the engine create one thread per data stream? Or one 
> thread per operator?
> 2. Is it possible to dynamically create a data stream at runtime when the job 
> starts? (i.e. if N is read from a file when the job starts and corresponding 
> N streams need to be created)
> 3. Are there any specific performance impacts when a large number of streams 
> (N ~ 1) are created, as opposed to N partitions within a single stream?
> 
> Are there any internal (design) documents which can help understanding the 
> implementation details? Any references to the source will also be really 
> helpful.
> 
> Thanks in advance.
> 
> Shailesh
> 
> 



Re: Weird performance on custom Hashjoin w.r.t. parallelism

2017-11-09 Thread Piotr Nowojski
Hi,

Yes as you correctly analysed parallelism 1 was causing problems, because it 
meant that all of the records must been gathered over the network from all of 
the task managers. Keep in mind that even if you increase parallelism to “p”, 
every change in parallelism can slow down your application, because events will 
have to be redistributed, which in most cases means network transfers. 

For measuring throughput you could use already defined metrics in Flink:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html
 


You can get list of vertices of your job:
http://:8081/jobs//vertices 
:8081/jobs/%3Cjob-id%3E/vertices>
Then statistics:
http://:8081/jobs//vertices//metrics 
:8081/jobs/%3Cjob-id%3E/vertices/:vertex-id:/metrics>

For example
http://localhost:8081/jobs/34c6f7d00cf9b3ebfff4d94ad465eb23/vertices 

http://localhost:8081/jobs/34c6f7d00cf9b3ebfff4d94ad465eb23/vertices/3d144c2a0fc19115f5f075ba85deac26/metrics
 


You can also try to aggregate them:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#rest-api-integration
 


Piotrek

> On 9 Nov 2017, at 07:53, m@xi  wrote:
> 
> Hello!
> 
> I found out that the cause of the problem was the map that I have after the
> parallel join with parallelism 1.
> When I changed it to .map(new MyMapMeter).setParallelism(p) then when I
> increase the number of parallelism p the completion time decreases, which is
> reasonable. Somehow it was a bottleneck of my parallel execution plan, but I
> had it this way in order to measure a valid average throughput.
> 
> So, my question is the following: 
> 
> How can I measure the average throughput of my parallel join operation
> properly?
> 
> Best,
> Max
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Do timestamps and watermarks exist after window evaluation?

2017-11-09 Thread Aljoscha Krettek
Hi,

This new section in the windowing documentation will help answer your question: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#working-with-window-results
 


Please let me know if you have any further questions. :-)

Best,
Aljoscha

> On 8. Nov 2017, at 18:54, Derek VerLee  wrote:
> 
> When composing ("chaining") multiple windowing operations on the same
> stream are watermarks transmitted down stream after window evaluation,
> and are the records emitted from WindowFunctions given timestamps? Do I
> need to or should I always assignTimestampsAndWatermarks to the
> outputsof window evaluations if I want to window again? If automatically
> assigned, how should I think about them in an event time context? Would
> the event time of a record resulting from a WindowFunction be the
> window's end time in the case of a TimeWindow?
> 



Re: How to best create a bounded session window ?

2017-11-09 Thread Vishal Santoshi
Thanks you for the response.

I would not mind the second scenario as in a second window, which
your illustration suggests with a custom trigger approach, I am not
certain  though that triggers  define the lifecycle of a window, as in a
trigger firing does not necessarily imply a Garbage Collectable Window.  It
should be GCed only after the watermark exceeds a hypothetically ever
increasing window leading boundary by a lag. In a some case that might
never happen as in the leading boundary is forever increasing. We may
decide to fire_and_purge. fire etc but the window remains live.  Or did I
get that part wrong ?


Vishal.




On Thu, Nov 9, 2017 at 8:24 AM, Piotr Nowojski 
wrote:

> It might be more complicated if you want to take into account events
> coming in out of order. For example you limit length of window to 5 and you
> get the following events:
>
> 1 2 3 4 6 7 8 5
>
> Do you want to emit windows:
>
> [1 2 3 4 5] (length limit exceeded) + [6 7 8] ?
>
> Or are you fine with interleaving windows in case of out of order:
>
> [1 2 3 4 6] + [5 7 8]
>
> If the latter one, some custom Trigger should be enough for you. If not,
> you would need to implement hypothetical MergingAndSplitableWindowAssigner,
> that after encountering late event “5” could split previously created
> windows. Unfortunately such feature is not supported by a WindowOperator,
> so you would have to implement your own operator for this.
>
> Regardless of your option remember to write some integration tests:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/
> testing.html#integration-testing
>
> Piotrek
>
> On 8 Nov 2017, at 21:43, Vishal Santoshi 
> wrote:
>
> I am implementing a bounded session window but I require to short circuit
> the session if the session length ( in count of events or time ) go beyond
> a configured limit , a very reasonable scenario ( bot etc ) . I am using
> the approach as listed. I am not sure though if the Window itself is being
> terminated and if that is even feasible. Any other approach or advise ?
>
> public class BoundedEventTimeTrigger extends Trigger {
> private static final long serialVersionUID = 1L;
> long maxSessionTime;
>
> ValueState doneState;
> private final ValueStateDescriptor cleanupStateDescriptor =
> new ValueStateDescriptor<>("done", Boolean.class );
>
> private BoundedEventTimeTrigger(long maxSessionTime) {
> this.maxSessionTime = maxSessionTime;
> }
>
> /**
>  * Creates an event-time trigger that fires once the watermark passes the 
> end of the window.
>  * 
>  * Once the trigger fires all elements are discarded. Elements that 
> arrive late immediately
>  * trigger window evaluation with just this one element.
>  */
> public static BoundedEventTimeTrigger create(long maxSessionLengh) {
> return new BoundedEventTimeTrigger(maxSessionLengh);
> }
>
> @Override
> public TriggerResult onElement(Object element, long timestamp, TimeWindow 
> window, TriggerContext ctx) throws Exception {
> if(cleanupState!=null && cleanupState.value()!=null && 
> cleanupState.value()) {
> return TriggerResult.CONTINUE;
> }
> if(timestamp - window.getStart() > maxSessionTime){
> System.out.println(new Date(timestamp) + "\t" + new 
> Date(window.getStart()));
> try {
> doneState = ctx.getPartitionedState(cleanupStateDescriptor);
> doneState.update(true);
> return TriggerResult.FIRE_AND_PURGE;
> } catch (IOException e) {
> throw new RuntimeException("Failed to update state", e);
> }
> }
>
> if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
> // if the watermark is already past the window fire immediately
> return TriggerResult.FIRE;
> } else {
> ctx.registerEventTimeTimer(window.maxTimestamp());
> return TriggerResult.CONTINUE;
> }
> }
>
> @Override
> public TriggerResult onEventTime(long time, TimeWindow window, 
> TriggerContext ctx) {
> return time == window.maxTimestamp() ?
> TriggerResult.FIRE :
> TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public void clear(TimeWindow window, TriggerContext ctx) throws Exception 
> {
> ctx.deleteEventTimeTimer(window.maxTimestamp());
> }
>
> @Override
> public boolean canMerge() {
> return true;
> }
>
> @Override
> public void onMerge(TimeWindow window,
> OnMergeContext ctx) {
> ctx.registerEventTimeTimer(window.maxTimestamp());
> }
>
> @Override
> public String toString() {
> re

Re: Serialization in Operator Chaining

2017-11-09 Thread Aljoscha Krettek
Hi,

If you use the DataSet API, there will be no serialisation between operations 
in a chain. If you use the DataStream API, there will be serialisation by 
default but you can disable that using 
executionEnv.getConfig().enableObjectReuse().

Hope that helps,
Aljoscha

> On 9. Nov 2017, at 13:57, Hicken, Jan  wrote:
> 
> Hi folks,
> 
> I have a question regarding the serialization in Flink's operator
> chaining:
> 
> Consider these two map functions: Map1 and Map2
> 
> As I haven't disabled operator chaining in the environment, these two
> functions will be chained into one operator when executing my job.
> 
> The thing is, that the serialization for objects of type T is quite
> expensive and I'd like to avoid that as much as possible. Does Flink
> actually serialize these objects under the hood even if the functions
> run in the same operator? If so, is it possible to disable the
> serialization somehow?
> 
> Kind regards,
> Jan



Re: When using Flink for CEP, can the data in Cassandra database be used for state

2017-11-09 Thread Kostas Kloudas
Hi Shyla,

Happy to hear that you are experimenting with CEP!

For enriching your input stream with data from Cassandra (or whichever external 
storage system) you could use:
* either the AsyncIO functionality offered by Flink 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
 
)
* or, iff all your database fits in memory, you could write a ProcessFunction 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html
 
)
 which loads the database in memory in the open() method, and then uses the 
data accordingly.

Afterwards, you can use the resulting (enriched) DataStream to feed it into CEP 
for further processing.

Hope this helps!
Kostas

> On Nov 9, 2017, at 12:08 AM, shyla deshpande  wrote:
> 
> Hello all,
> 
> I am new to Flink.
> 
> We have our data in Cassandra database. We have a use case for CEP. 
> I am checking out if Flink fits well for us.  
> 
> When processing the event data, I may want to pull data for the cassandra 
> database like the user profile and join with the event data.
> 
> Is there a way to do that?  I appreciate your help. 
> 
> Thanks



Re: How to best create a bounded session window ?

2017-11-09 Thread Piotr Nowojski
It might be more complicated if you want to take into account events coming in 
out of order. For example you limit length of window to 5 and you get the 
following events:

1 2 3 4 6 7 8 5

Do you want to emit windows:

[1 2 3 4 5] (length limit exceeded) + [6 7 8] ?

Or are you fine with interleaving windows in case of out of order:

[1 2 3 4 6] + [5 7 8] 

If the latter one, some custom Trigger should be enough for you. If not, you 
would need to implement hypothetical MergingAndSplitableWindowAssigner, that 
after encountering late event “5” could split previously created windows. 
Unfortunately such feature is not supported by a WindowOperator, so you would 
have to implement your own operator for this.

Regardless of your option remember to write some integration tests:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html#integration-testing
 


Piotrek

> On 8 Nov 2017, at 21:43, Vishal Santoshi  wrote:
> 
> I am implementing a bounded session window but I require to short circuit the 
> session if the session length ( in count of events or time ) go beyond a 
> configured limit , a very reasonable scenario ( bot etc ) . I am using the 
> approach as listed. I am not sure though if the Window itself is being 
> terminated and if that is even feasible. Any other approach or advise ?  
> 
> public class BoundedEventTimeTrigger extends Trigger {
> private static final long serialVersionUID = 1L;
> long maxSessionTime;
> 
> ValueState doneState;
> private final ValueStateDescriptor cleanupStateDescriptor =
> new ValueStateDescriptor<>("done", Boolean.class );
> 
> private BoundedEventTimeTrigger(long maxSessionTime) {
> this.maxSessionTime = maxSessionTime;
> }
> 
> /**
>  * Creates an event-time trigger that fires once the watermark passes the 
> end of the window.
>  * 
>  * Once the trigger fires all elements are discarded. Elements that 
> arrive late immediately
>  * trigger window evaluation with just this one element.
>  */
> public static BoundedEventTimeTrigger create(long maxSessionLengh) {
> return new BoundedEventTimeTrigger(maxSessionLengh);
> }
> 
> @Override
> public TriggerResult onElement(Object element, long timestamp, TimeWindow 
> window, TriggerContext ctx) throws Exception {
> if(cleanupState!=null && cleanupState.value()!=null && 
> cleanupState.value()) {
> return TriggerResult.CONTINUE;
> }
> if(timestamp - window.getStart() > maxSessionTime){
> System.out.println(new Date(timestamp) + "\t" + new 
> Date(window.getStart()));
> try {
> doneState = ctx.getPartitionedState(cleanupStateDescriptor);
> doneState.update(true);
> return TriggerResult.FIRE_AND_PURGE;
> } catch (IOException e) {
> throw new RuntimeException("Failed to update state", e);
> }
> }
> 
> if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
> // if the watermark is already past the window fire immediately
> return TriggerResult.FIRE;
> } else {
> ctx.registerEventTimeTimer(window.maxTimestamp());
> return TriggerResult.CONTINUE;
> }
> }
> 
> @Override
> public TriggerResult onEventTime(long time, TimeWindow window, 
> TriggerContext ctx) {
> return time == window.maxTimestamp() ?
> TriggerResult.FIRE :
> TriggerResult.CONTINUE;
> }
> 
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
> 
> @Override
> public void clear(TimeWindow window, TriggerContext ctx) throws Exception 
> {
> ctx.deleteEventTimeTimer(window.maxTimestamp());
> }
> 
> @Override
> public boolean canMerge() {
> return true;
> }
> 
> @Override
> public void onMerge(TimeWindow window,
> OnMergeContext ctx) {
> ctx.registerEventTimeTimer(window.maxTimestamp());
> }
> 
> @Override
> public String toString() {
> return "EventTimeTrigger()";
> }
> }
> 



Serialization in Operator Chaining

2017-11-09 Thread Hicken, Jan
Hi folks,

I have a question regarding the serialization in Flink's operator
chaining:

Consider these two map functions: Map1 and Map2

As I haven't disabled operator chaining in the environment, these two
functions will be chained into one operator when executing my job.

The thing is, that the serialization for objects of type T is quite
expensive and I'd like to avoid that as much as possible. Does Flink
actually serialize these objects under the hood even if the functions
run in the same operator? If so, is it possible to disable the
serialization somehow?

Kind regards,
Jan

signature.asc
Description: This is a digitally signed message part


Re: Generate watermarks per key in a KeyedStream

2017-11-09 Thread Shailesh Jain
Thanks for your reply, Xingcan.

On Wed, Nov 8, 2017 at 10:42 PM, Xingcan Cui  wrote:

> Hi Shailesh,
>
> actually, the watermarks are generated per partition, but all of them will
> be forcibly aligned to the minimum one during processing. That is decided
> by the semantics of watermark and KeyedStream, i.e., the watermarks belong
> to a whole stream and a stream is made up of different partitions (one per
> key).
>
> If the physical devices work in different time systems due to delay, the
> event streams from them should be treated separately.
>
> Hope that helps.
>
> Best,
> Xingcan
>
> On Wed, Nov 8, 2017 at 11:48 PM, Shailesh Jain <
> shailesh.j...@stellapps.com> wrote:
>
>> Hi,
>>
>> I'm working on implementing a use case wherein different physical devices
>> are sending events, and due to network/power issues, there can be a delay
>> in receiving events at Flink source. One of the operators within the flink
>> job is the Pattern operator, and there are certain patterns which are time
>> sensitive, so I'm using Event time characteristic. But the problem comes
>> when there are unpredictable delays in events from a particular device(s),
>> which causes those events to be dropped (as I cannot really define a static
>> bound to allow for lateness).
>>
>> Since I'm using a KeyedStream, keyed on the source device ID, is there a
>> way to allow each CEP operator instance (one per key) to progress its time
>> based on the event time in the corresponding stream partition. Or in other
>> words, is there a way to generate watermarks per partition in a KeyedStream?
>>
>> Thanks,
>> Shailesh
>>
>
>


Re: Broadcast to all the other operators

2017-11-09 Thread Ladhari Sadok
Ok thanks Tony, your answer is very helpful.

2017-11-09 11:09 GMT+01:00 Tony Wei :

> Hi Sadok,
>
> The sample code is just an example to show you how to broadcast the rules
> to all subtasks, but the output from CoFlatMap is not necessary to be
> Tuple2. It depends on what you actually need in your Rule
> Engine project.
> For example, if you can apply rule on each record directly, you can emit
> processed records to keyed operator.
> IMHO, the scenario in the article you mentioned is having serval
> well-prepared rules to enrich data, and using DSL files to decide what
> rules that incoming event needs. After enriching, the features for the
> particular event will be grouped by its random id and be calculated by the
> models.
> I think this approach might be close to the solution in that article, but
> it could have some difference according to different use cases.
>
> Best Regards,
> Tony Wei
>
>
> 2017-11-09 17:27 GMT+08:00 Ladhari Sadok :
>
>>
>> -- Forwarded message --
>> From: Ladhari Sadok 
>> Date: 2017-11-09 10:26 GMT+01:00
>> Subject: Re: Broadcast to all the other operators
>> To: Tony Wei 
>>
>>
>> Thanks Tony for your very fast answer ,
>>
>> Yes it resolves my problem that way, but with flatMap I will get
>> Tuple2 always in the processing function ( in
>> case of no rules update available and  in the other case ).
>> There is no optimization of this solution ? Do you think it is the same
>> solution in this picture : https://data-artisans.com/wp-c
>> ontent/uploads/2017/10/streaming-in-definitions.png ?
>>
>> Best regards,
>> Sadok
>>
>>
>> Le 9 nov. 2017 9:21 AM, "Tony Wei"  a écrit :
>>
>> Hi Sadok,
>>
>> What I mean is to keep the rules in the operator state. The event in Rule
>> Stream is just the change log about rules.
>> For more specific, you can fetch the rules from Redis in the open step of
>> CoFlatMap and keep them in the operator state, then use Rule Stream to
>> notify the CoFlatMap to 1. update some rules or 2. refetch all rules from
>> Redis.
>> Is that what you want?
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-11-09 15:52 GMT+08:00 Ladhari Sadok :
>>
>>> Thank you for the answer, I know that solution, but I don't want to
>>> stream the rules all time.
>>> In my case I have the rules in Redis and at startup of flink they are
>>> loaded.
>>>
>>> I want to broadcast changes just when it occurs.
>>>
>>> Thanks.
>>>
>>> Le 9 nov. 2017 7:51 AM, "Tony Wei"  a écrit :
>>>
 Hi Sadok,

 Since you want to broadcast Rule Stream to all subtasks, it seems that
 it is not necessary to use KeyedStream.
 How about use broadcast partitioner, connect two streams to attach the
 rule on each record or imply rule on them directly, and do the key operator
 after that?
 If you need to do key operator and apply the rules, it should work by
 changing the order.

 The code might be something like this, and you can change the rules'
 state in the CoFlatMapFunction.

 DataStream rules = ...;
 DataStream records = ...;
 DataStream> recordWithRule =
 rules.broadcast().connect(records).flatMap(...);
 dataWithRule.keyBy(...).process(...);

 Hope this will make sense to you.

 Best Regards,
 Tony Wei

 2017-11-09 6:25 GMT+08:00 Ladhari Sadok :

> Hello,
>
> I'm working on Rules Engine project with Flink 1.3, in this project I
> want to update some keyed operator state when external event occurred.
>
> I have a Datastream of updates (from kafka) I want to broadcast the
> data contained in this stream to all keyed operator so I can change the
> state in all operators.
>
> It is like this use case :
> Image : https://data-artisans.com/wp-content/uploads/2017/10/streami
> ng-in-definitions.png
> All article : https://data-artisans.com/blog
> /real-time-fraud-detection-ing-bank-apache-flink
>
> I founded it in the DataSet API but not in the DataStream API !
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/batch/index.html#broadcast-variables
>
> Can some one explain to me who to solve this problem ?
>
> Thanks a lot.
>
> Flinkly regards,
> Sadok
>


>>
>>
>>
>


Re: Broadcast to all the other operators

2017-11-09 Thread Tony Wei
Hi Sadok,

The sample code is just an example to show you how to broadcast the rules
to all subtasks, but the output from CoFlatMap is not necessary to be
Tuple2. It depends on what you actually need in your Rule
Engine project.
For example, if you can apply rule on each record directly, you can emit
processed records to keyed operator.
IMHO, the scenario in the article you mentioned is having serval
well-prepared rules to enrich data, and using DSL files to decide what
rules that incoming event needs. After enriching, the features for the
particular event will be grouped by its random id and be calculated by the
models.
I think this approach might be close to the solution in that article, but
it could have some difference according to different use cases.

Best Regards,
Tony Wei


2017-11-09 17:27 GMT+08:00 Ladhari Sadok :

>
> -- Forwarded message --
> From: Ladhari Sadok 
> Date: 2017-11-09 10:26 GMT+01:00
> Subject: Re: Broadcast to all the other operators
> To: Tony Wei 
>
>
> Thanks Tony for your very fast answer ,
>
> Yes it resolves my problem that way, but with flatMap I will get
> Tuple2 always in the processing function ( in
> case of no rules update available and  in the other case ).
> There is no optimization of this solution ? Do you think it is the same
> solution in this picture : https://data-artisans.com/wp-c
> ontent/uploads/2017/10/streaming-in-definitions.png ?
>
> Best regards,
> Sadok
>
>
> Le 9 nov. 2017 9:21 AM, "Tony Wei"  a écrit :
>
> Hi Sadok,
>
> What I mean is to keep the rules in the operator state. The event in Rule
> Stream is just the change log about rules.
> For more specific, you can fetch the rules from Redis in the open step of
> CoFlatMap and keep them in the operator state, then use Rule Stream to
> notify the CoFlatMap to 1. update some rules or 2. refetch all rules from
> Redis.
> Is that what you want?
>
> Best Regards,
> Tony Wei
>
> 2017-11-09 15:52 GMT+08:00 Ladhari Sadok :
>
>> Thank you for the answer, I know that solution, but I don't want to
>> stream the rules all time.
>> In my case I have the rules in Redis and at startup of flink they are
>> loaded.
>>
>> I want to broadcast changes just when it occurs.
>>
>> Thanks.
>>
>> Le 9 nov. 2017 7:51 AM, "Tony Wei"  a écrit :
>>
>>> Hi Sadok,
>>>
>>> Since you want to broadcast Rule Stream to all subtasks, it seems that
>>> it is not necessary to use KeyedStream.
>>> How about use broadcast partitioner, connect two streams to attach the
>>> rule on each record or imply rule on them directly, and do the key operator
>>> after that?
>>> If you need to do key operator and apply the rules, it should work by
>>> changing the order.
>>>
>>> The code might be something like this, and you can change the rules'
>>> state in the CoFlatMapFunction.
>>>
>>> DataStream rules = ...;
>>> DataStream records = ...;
>>> DataStream> recordWithRule =
>>> rules.broadcast().connect(records).flatMap(...);
>>> dataWithRule.keyBy(...).process(...);
>>>
>>> Hope this will make sense to you.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-11-09 6:25 GMT+08:00 Ladhari Sadok :
>>>
 Hello,

 I'm working on Rules Engine project with Flink 1.3, in this project I
 want to update some keyed operator state when external event occurred.

 I have a Datastream of updates (from kafka) I want to broadcast the
 data contained in this stream to all keyed operator so I can change the
 state in all operators.

 It is like this use case :
 Image : https://data-artisans.com/wp-content/uploads/2017/10/streami
 ng-in-definitions.png
 All article : https://data-artisans.com/blog
 /real-time-fraud-detection-ing-bank-apache-flink

 I founded it in the DataSet API but not in the DataStream API !

 https://ci.apache.org/projects/flink/flink-docs-release-1.3/
 dev/batch/index.html#broadcast-variables

 Can some one explain to me who to solve this problem ?

 Thanks a lot.

 Flinkly regards,
 Sadok

>>>
>>>
>
>
>


Re: Job Manager Configuration

2017-11-09 Thread Till Rohrmann
That is the question I hope to be able to answer with the logs. Let's see
what they say.

Cheers,
Till

On Wed, Nov 8, 2017 at 7:24 PM, Chan, Regina  wrote:

> Thanks for the responses!
>
>
>
> I’m currently using 1.2.0 – going to bump it up once I have things
> stabilized. I haven’t defined any slot sharing groups but I do think that
> I’ve probably got my job configured sub optimally. I’ve refactored my code
> so that I can submit subsets of the flow at a time and it seems to work.
> The break between the JobManager able to acknowledge job and not seems to
> hover somewhere between 10-20 flows.
>
>
>
> I guess what doesn’t make too much sense to me is if the user code is
> uploaded once to the JobManager and downloaded from each TaskManager, what
> exactly is the JobManager doing that’s keeping it busy? It’s the same code
> across the TaskManagers.
>
>
>
> I’ll get you the logs shortly.
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Wednesday, November 08, 2017 10:17 AM
> *To:* Chan, Regina [Tech]
> *Cc:* Chesnay Schepler; user@flink.apache.org
>
> *Subject:* Re: Job Manager Configuration
>
>
>
> Quick question Regina: Which version of Flink are you running?
>
>
>
> Cheers,
> Till
>
>
>
> On Tue, Nov 7, 2017 at 4:38 PM, Till Rohrmann 
> wrote:
>
> Hi Regina,
>
>
>
> the user code is uploaded once to the `JobManager` and then downloaded
> from each `TaskManager` once when it first receives the command to execute
> the first task of your job.
>
>
>
> As Chesnay said there is no fundamental limitation to the size of the
> Flink job. However, it might be the case that you have configured your job
> sub-optimally. You said that you have 300 parallel flows. Depending on
> whether you've defined separate slot sharing groups for them or not, it
> might be the case that parallel subtasks of all 300 parallel jobs share the
> same slot (if you haven't changed the slot sharing group). Depending on
> what you calculate, this can be inefficient because the individual tasks
> don't get much computation time. Moreover, all tasks will allocate some
> objects on the heap which can lead to more GC. Therefore, it might make
> sense to group some of the jobs together and run these jobs in batches
> after the previous batch completed. But this is hard to say without knowing
> the details of your job and getting a glimpse at the JobManager logs.
>
>
>
> Concerning the exception you're seeing, it would also be helpful to see
> the logs of the client and the JobManager. Actually, the scheduling of the
> job is independent of the response. Only the creation of the ExecutionGraph
> and making the JobGraph highly available in case of an HA setup are
> executed before the JobManager acknowledges the job submission. Only if
> this acknowledge message is not received in time on the client side, then
> the SubmissionTimeoutException is thrown. Therefore, I assume that somehow
> the JobManager is too busy or kept from sending the acknowledge message.
>
>
>
> Cheers,
>
> Till
>
>
>
>
>
>
>
> On Thu, Nov 2, 2017 at 7:18 PM, Chan, Regina  wrote:
>
> Does it copy per TaskManager or per operator? I only gave it 10
> TaskManagers with 2 slots. I’m perfectly fine with it queuing up and
> running when it has the resources to.
>
>
>
>
>
>
>
> *From:* Chesnay Schepler [mailto:ches...@apache.org]
> *Sent:* Wednesday, November 01, 2017 7:09 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Job Manager Configuration
>
>
>
> AFAIK there is no theoretical limit on the size of the plan, it just
> depends on the available resources.
>
>
>
> The job submissions times out since it takes too long to deploy all the
> operators that the job defines. With 300 flows, each with 6 operators
> you're looking at potentially (1800 * parallelism) tasks that have to be
> deployed. For each task Flink copies the user-code of *all* flows to the
> executing TaskManager, which the network may just not be handle in time.
>
> I suggest to split your job into smaller batches or even run each of them
> independently.
>
> On 31.10.2017 16:25, Chan, Regina wrote:
>
> Asking an additional question, what is the largest plan that the
> JobManager can handle? Is there a limit? My flows don’t need to run in
> parallel and can run independently. I wanted them to run in one single job
> because it’s part of one logical commit on my side.
>
>
>
> Thanks,
>
> Regina
>
>
>
> *From:* Chan, Regina [Tech]
> *Sent:* Monday, October 30, 2017 3:22 PM
> *To:* 'user@flink.apache.org'
> *Subject:* Job Manager Configuration
>
>
>
> Flink Users,
>
>
>
> I have about 300 parallel flows in one job each with 2 inputs, 3
> operators, and 1 sink which makes for a large job. I keep getting the below
> timeout exception but I’ve already set it to a 30 minute time out with a
> 6GB heap on the JobManager? Is there a heuristic to better configure the
> job manager?
>
>
>
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the J

Fwd: Broadcast to all the other operators

2017-11-09 Thread Ladhari Sadok
-- Forwarded message --
From: Ladhari Sadok 
Date: 2017-11-09 10:26 GMT+01:00
Subject: Re: Broadcast to all the other operators
To: Tony Wei 


Thanks Tony for your very fast answer ,

Yes it resolves my problem that way, but with flatMap I will get
Tuple2 always in the processing function ( in
case of no rules update available and  in the other case ).
There is no optimization of this solution ? Do you think it is the same
solution in this picture : https://data-artisans.com/wp-c
ontent/uploads/2017/10/streaming-in-definitions.png ?

Best regards,
Sadok


Le 9 nov. 2017 9:21 AM, "Tony Wei"  a écrit :

Hi Sadok,

What I mean is to keep the rules in the operator state. The event in Rule
Stream is just the change log about rules.
For more specific, you can fetch the rules from Redis in the open step of
CoFlatMap and keep them in the operator state, then use Rule Stream to
notify the CoFlatMap to 1. update some rules or 2. refetch all rules from
Redis.
Is that what you want?

Best Regards,
Tony Wei

2017-11-09 15:52 GMT+08:00 Ladhari Sadok :

> Thank you for the answer, I know that solution, but I don't want to stream
> the rules all time.
> In my case I have the rules in Redis and at startup of flink they are
> loaded.
>
> I want to broadcast changes just when it occurs.
>
> Thanks.
>
> Le 9 nov. 2017 7:51 AM, "Tony Wei"  a écrit :
>
>> Hi Sadok,
>>
>> Since you want to broadcast Rule Stream to all subtasks, it seems that it
>> is not necessary to use KeyedStream.
>> How about use broadcast partitioner, connect two streams to attach the
>> rule on each record or imply rule on them directly, and do the key operator
>> after that?
>> If you need to do key operator and apply the rules, it should work by
>> changing the order.
>>
>> The code might be something like this, and you can change the rules'
>> state in the CoFlatMapFunction.
>>
>> DataStream rules = ...;
>> DataStream records = ...;
>> DataStream> recordWithRule =
>> rules.broadcast().connect(records).flatMap(...);
>> dataWithRule.keyBy(...).process(...);
>>
>> Hope this will make sense to you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-11-09 6:25 GMT+08:00 Ladhari Sadok :
>>
>>> Hello,
>>>
>>> I'm working on Rules Engine project with Flink 1.3, in this project I
>>> want to update some keyed operator state when external event occurred.
>>>
>>> I have a Datastream of updates (from kafka) I want to broadcast the data
>>> contained in this stream to all keyed operator so I can change the state in
>>> all operators.
>>>
>>> It is like this use case :
>>> Image : https://data-artisans.com/wp-content/uploads/2017/10/streami
>>> ng-in-definitions.png
>>> All article : https://data-artisans.com/blog
>>> /real-time-fraud-detection-ing-bank-apache-flink
>>>
>>> I founded it in the DataSet API but not in the DataStream API !
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/batch/index.html#broadcast-variables
>>>
>>> Can some one explain to me who to solve this problem ?
>>>
>>> Thanks a lot.
>>>
>>> Flinkly regards,
>>> Sadok
>>>
>>
>>


Correlation between data streams/operators and threads

2017-11-09 Thread Shailesh Jain
Hi,

I'm trying to understand the runtime aspect of Flink when dealing with
multiple data streams and multiple operators per data stream.

Use case: N data streams in a single flink job (each data stream
representing 1 device - with different time latencies), and each of these
data streams gets split into two streams, of which one goes into a bunch of
CEP operators, and one into a process function.

Questions:
1. At runtime, will the engine create one thread per data stream? Or one
thread per operator?
2. Is it possible to dynamically create a data stream at runtime when the
job starts? (i.e. if N is read from a file when the job starts and
corresponding N streams need to be created)
3. Are there any specific performance impacts when a large number of
streams (N ~ 1) are created, as opposed to N partitions within a single
stream?

Are there any internal (design) documents which can help understanding the
implementation details? Any references to the source will also be really
helpful.

Thanks in advance.

Shailesh


Re: Broadcast to all the other operators

2017-11-09 Thread Tony Wei
Hi Sadok,

What I mean is to keep the rules in the operator state. The event in Rule
Stream is just the change log about rules.
For more specific, you can fetch the rules from Redis in the open step of
CoFlatMap and keep them in the operator state, then use Rule Stream to
notify the CoFlatMap to 1. update some rules or 2. refetch all rules from
Redis.
Is that what you want?

Best Regards,
Tony Wei

2017-11-09 15:52 GMT+08:00 Ladhari Sadok :

> Thank you for the answer, I know that solution, but I don't want to stream
> the rules all time.
> In my case I have the rules in Redis and at startup of flink they are
> loaded.
>
> I want to broadcast changes just when it occurs.
>
> Thanks.
>
> Le 9 nov. 2017 7:51 AM, "Tony Wei"  a écrit :
>
>> Hi Sadok,
>>
>> Since you want to broadcast Rule Stream to all subtasks, it seems that it
>> is not necessary to use KeyedStream.
>> How about use broadcast partitioner, connect two streams to attach the
>> rule on each record or imply rule on them directly, and do the key operator
>> after that?
>> If you need to do key operator and apply the rules, it should work by
>> changing the order.
>>
>> The code might be something like this, and you can change the rules'
>> state in the CoFlatMapFunction.
>>
>> DataStream rules = ...;
>> DataStream records = ...;
>> DataStream> recordWithRule =
>> rules.broadcast().connect(records).flatMap(...);
>> dataWithRule.keyBy(...).process(...);
>>
>> Hope this will make sense to you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-11-09 6:25 GMT+08:00 Ladhari Sadok :
>>
>>> Hello,
>>>
>>> I'm working on Rules Engine project with Flink 1.3, in this project I
>>> want to update some keyed operator state when external event occurred.
>>>
>>> I have a Datastream of updates (from kafka) I want to broadcast the data
>>> contained in this stream to all keyed operator so I can change the state in
>>> all operators.
>>>
>>> It is like this use case :
>>> Image : https://data-artisans.com/wp-content/uploads/2017/10/streami
>>> ng-in-definitions.png
>>> All article : https://data-artisans.com/blog
>>> /real-time-fraud-detection-ing-bank-apache-flink
>>>
>>> I founded it in the DataSet API but not in the DataStream API !
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/batch/index.html#broadcast-variables
>>>
>>> Can some one explain to me who to solve this problem ?
>>>
>>> Thanks a lot.
>>>
>>> Flinkly regards,
>>> Sadok
>>>
>>
>>