Re: Flink 1.5 job distribution over cluster nodes

2018-07-18 Thread Vishal Santoshi
For example state size / logs etc all these are now one one physical node (
or couple rather then spread out )  for that one pipe where we desire to
have a large state.  We have decreased the slots ( pretty artificial a set
up ) to give a node less stress. I wish there was a RFC from the wider
community on this, IMveryHO.

On Wed, Jul 18, 2018 at 9:43 AM, Vishal Santoshi 
wrote:

> I think there is something to be said about making this distribution more
> flexible.  A stand alone cluster, still the distribution mechanism for many
> a installations suffers horribly with the above approach. A healthy cluster
> requires resources wot be used equitably is possible. I have some pipes
> that are CPU intense and some not so much. Slots being the primary unit of
> parallelism does not differentiate that much between a stress inducing pipe
> or other wise and thus having them spread out becomes essential to avoid
> skews.
>
> Maybe a simple setting could have forked different approaches to slot
> distribution. standalone = true for example.  I have a n node cluster with
> 2 nodes being heavily stressed ( LA approaching the slot size ) b'coz of
> this new setup and that does not seem right. Ot is still a physical node
> that that these process run on with local drives and so on.
>
>
>
>
>
>
> On Wed, Jul 18, 2018, 7:54 AM scarmeli  wrote:
>
>> Answered in another mailing list
>>
>> Hi Shachar,
>>
>> with Flink 1.5 we added resource elasticity. This means that Flink is now
>> able to allocate new containers on a cluster management framework like
>> Yarn
>> or Mesos. Due to these changes (which also apply to the standalone mode),
>> Flink no longer reasons about a fixed set of TaskManagers because if
>> needed
>> it will start new containers (does not work in standalone mode).
>> Therefore,
>> it is hard for the system to make any decisions about spreading slots
>> belonging to a single job out across multiple TMs. It gets even harder
>> when
>> you consider that some jobs like yours might benefit from such a strategy
>> whereas others would benefit from co-locating its slots. It gets even
>> more
>> complicated if you want to do scheduling wrt to multiple jobs which the
>> system does not have full knowledge about because they are submitted
>> sequentially. Therefore, Flink currently assumes that slots requests can
>> be
>> fulfilled by any TaskManager.
>>
>> Cheers,
>> Till
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>


Re: Why data didn't enter the time window in EventTime mode

2018-07-18 Thread Hequn Cheng
Hi Soheil,

> wait 8 milliseconds (according to my code) to see if any other data with
the same key received or not and after 8 millisecond it will be triggered.
Yes, but the time is event time, so if there is no data from source the
time won't be advanced.

There are some reasons why the event time has not been advanced:
1. There are no data from the source
2. One of the source parallelisms doesn't have data
3. The time field, i.e, Long in Tuple3, should be millisecond instead of
second.
4. Data should cover a longer time spam than the window size to advance the
event time.

Best, Hequn

On Wed, Jul 18, 2018 at 3:53 PM, Soheil Pourbafrani 
wrote:

> Hi,
>
> In a datastream processing problem, the source generated data every 8
> millisecond and timestamp is a field of the data. In default Flink time
> behavior data enter the time window but when I set Flink time to EventTime
> it will output nothing! Here is the code:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> SingleOutputStreamOperator> res = 
> aggregatedTuple
> .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor JSONObject>>(Time.milliseconds(8)) {
>
> @Override
> public long extractTimestamp(Tuple3 
> element) {
> return element.f1 ;
> }
> }).keyBy(1).timeWindow(Time.milliseconds(8))
> .allowedLateness(Time.milliseconds(3))
> .sideOutputLateData(lateOutputTag)
> .reduce(processing...);
> DataStream> lateData = 
> res.getSideOutput(lateOutputTag);
> res.print();
>
> What is the problem with my code?
> According to the Flink documents, my understanding about EventTime is that
> for example in case of time window when a new data received it start a new
> (logical window) based on new data event timestamp and wait 8 milliseconds
> (according to my code) to see if any other data with the same key received
> or not and after 8 millisecond (from timestamp of the first element of the
> window) it will be triggered. Since data source generated data in a
> constant periodic interval, I set a watermarck of  8, too. Is my
> understanding about Flink window in EventTime correct?
>


Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode

2018-07-18 Thread Ron Crocker
I just stumbled on this same problem without any associated ZK issues. We had a 
Kafka broker fail that caused this issue:

2018-07-18 02:48:13,497 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Produce: 
 (2/4) (7e7d61b286d90c51bbd20a15796633f2) switched from 
RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected 
before a response was received.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server 
disconnected before a response was received.

This is the kind of error we should be robust to - the Kafka cluster will 
(reasonably quickly) recover and give a new broker for a particular partition 
(in this case, partition #2). Maybe retries should be the default 
configuration? I believe the client uses the Kafka defaults (acks=0, 
retries=0), but we typically run with acks=1 (or all) and retries=MAX_INT. Do I 
need to do anything more than that to get a more robust producer?

Ron

> On May 16, 2018, at 7:45 PM, Tony Wei  wrote:
> 
> Hi Ufuk, Piotr
> 
> Thanks for all of your replies. I knew that jobs are cancelled if the JM 
> looses the connection to ZK, but JM didn't loose connection in my case.
> My job failed because of the exception from KafkaProducer. However, it 
> happened before and after that exception that TM lost ZK connection.
> So, as Piotr said, it looks like an error in Kafka producer and I will pay 
> more attention on it to see if there is something unexpected happens again.
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-15 19:56 GMT+08:00 Piotr Nowojski  >:
> Hi,
> 
> It looks like there was an error in asynchronous job of sending the records 
> to Kafka. Probably this is a collateral damage of loosing connection to 
> zookeeper. 
> 
> Piotrek
> 
>> On 15 May 2018, at 13:33, Ufuk Celebi > > wrote:
>> 
>> Hey Tony,
>> 
>> thanks for the detailed report.
>> 
>> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and 
>> recovered when the connection is re-established (and one JM becomes leader 
>> again).
>> 
>> - Regarding the KafkaProducer: I'm not sure from the log message whether 
>> Flink closes the KafkaProducer because the job is cancelled or because there 
>> is a connectivity issue to the Kafka cluster. Including Piotr (cc) in this 
>> thread who has worked on the KafkaProducer in the past. If it is a 
>> connectivity issue, it might also explain why you lost the connection to ZK.
>> 
>> Glad to hear that everything is back to normal. Keep us updated if something 
>> unexpected happens again.
>> 
>> – Ufuk
>> 
>> 
>> On Tue, May 15, 2018 at 6:28 AM, Tony Wei > > wrote:
>> Hi all,
>> 
>> I restarted the cluster and changed the log level to DEBUG, and raised the 
>> parallelism of my streaming job from 32 to 40.
>> However, the problem just disappeared and I don't know why.
>> I will remain these settings for a while. If the error happen again, I will 
>> bring more informations back for help. Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-05-14 14:24 GMT+08:00 Tony Wei > >:
>> Hi all,
>> 
>> After I changed the `high-availability.zookeeper.client.session-timeout` and 
>> `maxSessionTimeout` to 12ms, the exception still occurred.
>> 
>> Here is the log snippet. It seems this is nothing to do with zookeeper 
>> client timeout, but I still don't know why kafka producer would be closed 
>> without any task state changed.
>> 
>> ```
>> 2018-05-14 05:18:53,468 WARN  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
>> session timed out, have not heard from server in 82828ms for sessionid 
>> 0x305f957eb8d000a
>> 2018-05-14 05:18:53,468 INFO  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
>> session timed out, have not heard from server in 82828ms for sessionid 
>> 0x305f957eb8d000a, closing socket connection and attempting reconnect
>> 2018-05-14 05:18:53,571 INFO  
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>>   - State change: SUSPENDED
>> 2018-05-14 05:18:53,574 WARN  
>> 

Re: Flink resource manager unable to connect to mesos after restart

2018-07-18 Thread Gary Yao
Hi,

If you are able to re-produce this reliably, can you post the jobmanager
logs?

Best,
Gary

On Wed, Jul 18, 2018 at 10:33 AM, Renjie Liu 
wrote:

> Hi, all:
>
> I'm testing flink 1.5.0 and find that flink mesos resource manager unable
> to connect to mesos after restart. Have you seen this happenen?
> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: Race between window assignment and same window timeout

2018-07-18 Thread Andrey Zagrebin
Hi Shay,

I would suggest to try Allowed Lateness, like you mention 500 ms:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#allowed-lateness
 

It might also work for processing time.

Cheers,
Andrey

> On 18 Jul 2018, at 17:22, Shimony, Shay  wrote:
> 
> Hi,
>  
> It seems like we encounter a race situation between the aggregation thread 
> and the Time Trigger thread.
> It might not be a bug, but it still seems strange to us, and we would like 
> your help to fix it/work around it please.
>  
> First, few descriptions about our use case and system:
> · We are working with processing time.
> · We are using Flink 1.4.
> · We use our customized sliding window of size 1 minute, slide 10 
> seconds. 
> But we think it can happen also in tumbling window. So for simplicity, let’s 
> assume tumbling window of 1 minute.
> · Our window Trigger does FIRE upon each element.
> · We have constant 2k/sec incoming messages, balanced rate.
> · When I say “window state” I mean simply our aggregation value in it.
>  
> If the timestamp of an element is very close to the end of the window, then 
> it will be assigned with that window of course, but it occasionally happen 
> that this window is timing out and cleared – before this element is 
> aggregated with the window state, thus we lost the previous aggregation value 
> and got new aggregation state with the element value.
>  
> Below is the story as seen by the threads. 
> Timestamps are logical.
>  
> Suppose we are in the beginning of WindowOperator.processElement.
> Current time: 119 (nearly 120)
>  
> Reducer thread
> Time Trigger thread
> Assign element to window [60, 120],
> because context.getCurrentProcessingTime()
> Returned 119 (in assignWindows)
>  
>  
> Time is 120 à clear window state
> Add the element value to window state [60, 120] (it starts from new state)
>  
>  
> Our questions:
> 1.   Is it a legitimate race? (We expected that (1) assigning element to 
> a window + aggregating it to its state, and (2) clearing the window – would 
> be atomic to each other – that is, if an element is valid for a window, then 
> it will be assigned to it and aggregated fully into its state, and only then 
> window clear can happen).
> 2.   How could we make the Time Trigger thread wait a little bit with the 
> window cleaning? Like adding 500ms to clean window time schedule.
> We thought to override WindowOperator.cleanupTime, so is it possible to 
> easily replace WindowOperator with ours?
> 3.   Maybe you have different idea to work around it?
>  
> Thanks!
> Shay



Re: Serialization questions

2018-07-18 Thread Andrey Zagrebin
Another way would be also to make `EntitonAtom` extend 
`org.apache.flink.types.Value` and implement `IOReadableWritable` using custom 
(Kryo) serialisation.

> On 18 Jul 2018, at 18:27, Andrey Zagrebin  wrote:
> 
> Hi Flavio,
> 
> According to the current implementation of `disableGenericTypes`, the 
> exception you get should be valid because Kryo still has to be used for 
> `EntitonAtom` which might be classified as generic (non-serialisable by 
> Flink). You cannot specify exceptions for this check at the moment.
> 
> If you want to have control for which classes Kryo can be used and still 
> activate `disableGenericTypes`, currently you can create your own type info 
> where you provide your own Flink serialiser (extends TypeSerializer) which 
> can internally use Kryo (KryoSerializer).
> You can do it using class annotation:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/types_serialization.html#defining-type-information-using-a-factory
>  
> 
> This way Flink should not treat it as generic a type.
> 
> Cheers,
> Andrey
> 
>> On 18 Jul 2018, at 11:26, Flavio Pompermaier > > wrote:
>> 
>> Hi Minglei,
>> using the registerTypeWithKryoSerializer with the 3 classes works (without 
>> disableGenericTypes) but the problem is that I would like to avoid Kryo 
>> serialization if this is useful to speedup the job performance,
>> and thus I'd like to be able to run all jobs with disableGenericTypes. 
>> 
>> Best,
>> Flavio
>> 
>> On Wed, Jul 18, 2018 at 11:10 AM, zhangminglei <18717838...@163.com 
>> > wrote:
>> Hi, Flavio
>> 
>>> addDefaultKryoSerializer differs from registerTypeWithKryoSerializer 
>>> because addDefaultKryoSerializer use the passed serializer also for 
>>> subclasses of the configured class. Am I right? This is not very clear in 
>>> the method's Javadoc…
>> 
>> I think it is not exactly a problem with flink. Instead of a kryo problem. 
>> For example, addDefaultKryoSerializer corresponding to the 
>> addDefaultSerializer(int[].class, IntArraySerializer.class) in kryo, whereas 
>> registerTypeWithKryoSerializer corresponding to the register(int.class, new 
>> IntSerializer()) in kryo.With register, you explicitly assign an id for that 
>> type plus serializer. The default serializer just tells kryo which 
>> serializer to use when this type has to be serialized, kryo will then 
>> implicitly register the serializer. And the advantage of using register 
>> would be [1]. when setting setRegistrationRequired(true), which is 
>> recommended (and will be the default in 5.0), you'd have to register every 
>> occurring type explicitly.
>> 
>>> how to avoid that exception?
>> 
>> You can try below and do not make disableGenericTypes and see what happens.
>> 
>> env.registerTypeWithKryoSerializer(DateTime.class, 
>> JodaDateTimeSerializer.class);
>> env.registerTypeWithKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
>> env.registerTypeWithKryoSerializer(EntitonQuad.class, TBaseSerializer.class);
>> 
>> 
>> [1] 
>> https://github.com/EsotericSoftware/kryo/blob/master/README.md#registration 
>> 
>>  
>> 
>> Cheers
>> Minglei
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>> 在 2018年7月17日,下午9:00,Flavio Pompermaier >> > 写道:
>>> 
>>> Hi to all,
>>> I was trying to check whether our jobs are properly typed or not.
>>> I've started disabling generic types[1] in order to discover untyped 
>>> transformations and so I added the proper returns() to operators.
>>> 
>>> Unfortunately there are jobs where we serialize Thrift and DateTime 
>>> objects, so I need to properly configure the serializers in the 
>>> ExecutionEnvironment:
>>> 
>>> env.registerTypeWithKryoSerializer(DateTime.class, 
>>> JodaDateTimeSerializer.class);
>>> env.getConfig().addDefaultKryoSerializer(EntitonAtom.class, 
>>> TBaseSerializer.class);
>>> env.getConfig().addDefaultKryoSerializer(EntitonQuad.class, 
>>> TBaseSerializer.class);
>>> 
>>> Those jobs don't work when I disable generic types and I get the following 
>>> exception:
>>> 
>>> Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
>>> types have been 
>>> disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a 
>>> generic type.
>>> 
>>>  I have a couple of questions:
>>> addDefaultKryoSerializer differs from registerTypeWithKryoSerializer 
>>> because addDefaultKryoSerializer use the passed serializer also for 
>>> subclasses of the configured class. Am I right? This is not very clear in 
>>> the method's Javadoc...
>>> how to avoid that exception?
>>> Best,
>>> Flavio
>>> 
>>> [1] env.getConfig().disableGenericTypes();
>> 
>> 
>> 
> 



Re: Serialization questions

2018-07-18 Thread Andrey Zagrebin
Hi Flavio,

According to the current implementation of `disableGenericTypes`, the exception 
you get should be valid because Kryo still has to be used for `EntitonAtom` 
which might be classified as generic (non-serialisable by Flink). You cannot 
specify exceptions for this check at the moment.

If you want to have control for which classes Kryo can be used and still 
activate `disableGenericTypes`, currently you can create your own type info 
where you provide your own Flink serialiser (extends TypeSerializer) which can 
internally use Kryo (KryoSerializer).
You can do it using class annotation:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/types_serialization.html#defining-type-information-using-a-factory
 

This way Flink should not treat it as generic a type.

Cheers,
Andrey

> On 18 Jul 2018, at 11:26, Flavio Pompermaier  wrote:
> 
> Hi Minglei,
> using the registerTypeWithKryoSerializer with the 3 classes works (without 
> disableGenericTypes) but the problem is that I would like to avoid Kryo 
> serialization if this is useful to speedup the job performance,
> and thus I'd like to be able to run all jobs with disableGenericTypes. 
> 
> Best,
> Flavio
> 
> On Wed, Jul 18, 2018 at 11:10 AM, zhangminglei <18717838...@163.com 
> > wrote:
> Hi, Flavio
> 
>> addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because 
>> addDefaultKryoSerializer use the passed serializer also for subclasses of 
>> the configured class. Am I right? This is not very clear in the method's 
>> Javadoc…
> 
> I think it is not exactly a problem with flink. Instead of a kryo problem. 
> For example, addDefaultKryoSerializer corresponding to the 
> addDefaultSerializer(int[].class, IntArraySerializer.class) in kryo, whereas 
> registerTypeWithKryoSerializer corresponding to the register(int.class, new 
> IntSerializer()) in kryo.With register, you explicitly assign an id for that 
> type plus serializer. The default serializer just tells kryo which serializer 
> to use when this type has to be serialized, kryo will then implicitly 
> register the serializer. And the advantage of using register would be [1]. 
> when setting setRegistrationRequired(true), which is recommended (and will be 
> the default in 5.0), you'd have to register every occurring type explicitly.
> 
>> how to avoid that exception?
> 
> You can try below and do not make disableGenericTypes and see what happens.
> 
> env.registerTypeWithKryoSerializer(DateTime.class, 
> JodaDateTimeSerializer.class);
> env.registerTypeWithKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
> env.registerTypeWithKryoSerializer(EntitonQuad.class, TBaseSerializer.class);
> 
> 
> [1] 
> https://github.com/EsotericSoftware/kryo/blob/master/README.md#registration 
>  
> 
> Cheers
> Minglei
> 
> 
> 
> 
> 
> 
> 
> 
>> 在 2018年7月17日,下午9:00,Flavio Pompermaier > > 写道:
>> 
>> Hi to all,
>> I was trying to check whether our jobs are properly typed or not.
>> I've started disabling generic types[1] in order to discover untyped 
>> transformations and so I added the proper returns() to operators.
>> 
>> Unfortunately there are jobs where we serialize Thrift and DateTime objects, 
>> so I need to properly configure the serializers in the ExecutionEnvironment:
>> 
>> env.registerTypeWithKryoSerializer(DateTime.class, 
>> JodaDateTimeSerializer.class);
>> env.getConfig().addDefaultKryoSerializer(EntitonAtom.class, 
>> TBaseSerializer.class);
>> env.getConfig().addDefaultKryoSerializer(EntitonQuad.class, 
>> TBaseSerializer.class);
>> 
>> Those jobs don't work when I disable generic types and I get the following 
>> exception:
>> 
>> Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
>> types have been 
>> disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a 
>> generic type.
>> 
>>  I have a couple of questions:
>> addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because 
>> addDefaultKryoSerializer use the passed serializer also for subclasses of 
>> the configured class. Am I right? This is not very clear in the method's 
>> Javadoc...
>> how to avoid that exception?
>> Best,
>> Flavio
>> 
>> [1] env.getConfig().disableGenericTypes();
> 
> 
> 



Re: Window Stream - Need assistance

2018-07-18 Thread vino yang
Hi Rakkesh,

As Xingcan said, the trigger is required by the window to FIRE, you can use
time window (contains a inner trigger) or (ProcessFunction + State + Timer).

Thanks, vino.

2018-07-18 21:44 GMT+08:00 Titus Rakkesh :

> Thanks Xingcan. I specified as GlobalWindow since I am going to put all
> the elements coming with splittedActivationTuple with a 24 hour expiry and
> then do operations on that when elements coming with stream
> "unionReloadsStream" (bigger set).
>
> On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui  wrote:
>
>> Hi Rakkesh,
>>
>> The `GlobalWindow` is commonly used for custom window assignment and you
>> should specify a `trigger` for it [1].
>> If the built-in window (e.g., tumbling window or sliding window) join in
>> DataStream API fails to meet the requirements,
>> you could try the time-windowed join in Table/SQL API [2].
>>
>> Hope that helps.
>>
>> Best,
>> Xingcan
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/stream/operators/windows.html#global-windows
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sql.html#joins
>>
>>
>> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh 
>> wrote:
>>
>> Thanks for the reply. I have called "env.execute()". But nothing getting
>> printed. I have a doubt whether "implemented function" is correct with my
>> "requirement". Please assist.
>>
>> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui  wrote:
>>
>>> Hi Rakkesh,
>>>
>>> Did you call `execute()`on your `StreamExecutionEnvironment`?
>>>
>>> Best,
>>> Xingcan
>>>
>>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh 
>>> wrote:
>>> >
>>> > Dear Friends,
>>> >  I have 2 streams of the below data types.
>>> >
>>> > DataStream> splittedActivationTuple;
>>> >
>>> > DataStream> unionReloadsStream;
>>> >
>>> > These streams are getting data from Kafka and getting data in
>>> different frequencies. "unionReloadsStream"  will receive more data than
>>> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
>>> Window of 24 hours and manipulate its "Double" field, if a matching data
>>> comes from unionReloadsStream (String field is the common field).
>>> >
>>> > So I wrote the following method to do this task.
>>> >
>>> >
>>> > public static DataStream>
>>> joinActivationsBasedOnReload(
>>> > DataStream>
>>> activationsStream,
>>> > DataStream> unifiedReloadStream) {
>>> >
>>> > return activationsStream.join(unifiedReloadStream).where(new
>>> ActivationStreamSelector())
>>> > .equalTo(new ReloadStreamSelector()).window
>>> (GlobalWindows.create())
>>> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>>> > .apply(new JoinFunction>> Double>, Tuple2, Tuple3>() {
>>> > private static final long serialVersionUID = 1L;
>>> > @Override
>>> > public Tuple3
>>> join(Tuple3 first,
>>> > Tuple2 second) {
>>> > return new Tuple3>> Double>(first.f0, first.f1, first.f2 + second.f1);
>>> > }
>>> > });
>>> > }
>>> >
>>> >
>>> > and calling as,
>>> >
>>> > DataStream> activationWindowStream =
>>> joinActivationsBasedOnReload(splittedActivationTuple,
>>> unionReloadsStream);
>>> >
>>> > activationWindowStream.print();
>>> >
>>> >
>>> > But I couldn't see anything printing.
>>> >
>>> > I expected "activationWindowStream" to contain the
>>> "splittedActivationTuple" (smaller set) data and the Double value
>>> accumulated if  unionReloadsStream's incoming elements have a matching
>>> "String" field. But that is not happening. Where I am missing?
>>> >
>>> > Thanks,
>>> > Rakkesh
>>>
>>>
>>
>>
>


Re: Elasticsearch 6.3.x connector

2018-07-18 Thread vino yang
To Gordon:
+1 for your efforts, considering ElasticSearch is a very popular full-text
search and analysis engine. If it could be merged into 1.6 version, that
would be very good to the community.

Thanks, vino.

2018-07-18 17:42 GMT+08:00 Tzu-Li (Gordon) Tai :

> Hi Miki,
>
> The latest stable version of the Elasticsearch connector, as of Flink
> 1.5.x, is Elasticsearch 5.x.
>
> As for Elasticsearch 6.x, there has been some PRs that has been open for a
> while and have already been discussed quite thoroughly [1] [2].
>
> Till and I have talked about merging these for 1.6 (this normally should
> not occur since we’re already past the feature freeze for 1.6), and we’ve
> agreed to try to do that.
>
> Cheers,
> Gordon
>
> [1] https://github.com/apache/flink/pull/6043
> [2] https://github.com/apache/flink/pull/5374
>
> On 16 July 2018 at 11:25:14 PM, vino yang (yanghua1...@gmail.com) wrote:
>
> Hi miki,
>
> Flink does not provide a connector for ElasticSearch 6 yet. There is this
> JIRA issue to track the development progress [1]. Based on the issues's
> status, it is in progress.
>
> Flink's documentation about the elasticsearch connector is right, current
> stable version is 5.x. And the maven repository dependency you provided
> does not mean flink depends on elasticsearch 6.3.1, which means the
> elasticsearch's latest version in maven repository.
>
> Thanks.
> vino.
>
> [1] https://issues.apache.org/jira/browse/FLINK-8101
>
> 2018-07-16 19:05 GMT+08:00 miki haiat :
>
>>
>> HI ,
>>
>> I just wondered what is to status of  the 6.3.x  elastic connector.
>> flink-connector-elasticsearch-base_2.11
>> 
>> has elastic 6.3.1 dependencies .
>>
>> Documentation mention 5.3 as the stable version  elasticsearch.html
>> 
>>
>>
>> What is the latest stable version of the elasitc connector .
>>
>>
>> Miki
>>
>>
>>
>


Race between window assignment and same window timeout

2018-07-18 Thread Shimony, Shay
Hi,

It seems like we encounter a race situation between the aggregation thread and 
the Time Trigger thread.
It might not be a bug, but it still seems strange to us, and we would like your 
help to fix it/work around it please.

First, few descriptions about our use case and system:

· We are working with processing time.

· We are using Flink 1.4.

· We use our customized sliding window of size 1 minute, slide 10 
seconds.
But we think it can happen also in tumbling window. So for simplicity, let's 
assume tumbling window of 1 minute.

· Our window Trigger does FIRE upon each element.

· We have constant 2k/sec incoming messages, balanced rate.

· When I say "window state" I mean simply our aggregation value in it.

If the timestamp of an element is very close to the end of the window, then it 
will be assigned with that window of course, but it occasionally happen that 
this window is timing out and cleared - before this element is aggregated with 
the window state, thus we lost the previous aggregation value and got new 
aggregation state with the element value.

Below is the story as seen by the threads.
Timestamps are logical.

Suppose we are in the beginning of WindowOperator.processElement.
Current time: 119 (nearly 120)

Reducer thread

Time Trigger thread

Assign element to window [60, 120],

because context.getCurrentProcessingTime()

Returned 119 (in assignWindows)





Time is 120 --> clear window state

Add the element value to window state [60, 120] (it starts from new state)




Our questions:

1.   Is it a legitimate race? (We expected that (1) assigning element to a 
window + aggregating it to its state, and (2) clearing the window - would be 
atomic to each other - that is, if an element is valid for a window, then it 
will be assigned to it and aggregated fully into its state, and only then 
window clear can happen).

2.   How could we make the Time Trigger thread wait a little bit with the 
window cleaning? Like adding 500ms to clean window time schedule.
We thought to override WindowOperator.cleanupTime, so is it possible to easily 
replace WindowOperator with ours?

3.   Maybe you have different idea to work around it?

Thanks!
Shay



Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-18 Thread Andrey Zagrebin
Hi Gerard,

There is an issue recently fixed for 1.5.2, 1.6.0:
https://issues.apache.org/jira/browse/FLINK-9575 

It might have caused your problem.

Can you please provide log from JobManager/Entry point for further 
investigation?

Cheers,
Andrey

> On 18 Jul 2018, at 10:16, Gerard Garcia  wrote:
> 
> Hi vino,
> 
> Seems that jobs id stay in /jobgraphs when we cancel them manually. For 
> example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09 
> the entry is still in zookeeper's path /flink/default/jobgraphs, but the job 
> disappeared from /home/nas/flink/ha/default/blob/.
> 
> That is the client log:
> 
> 09:20:58.492 [main] INFO  org.apache.flink.client.cli.CliFrontend  - 
> Cancelling job 75e16686cb4fe0d33ead8e29af131d09.
> 09:20:58.503 [main] INFO  org.apache.flink.runtime.blob.FileSystemBlobStore  
> - Creating highly available BLOB storage directory at 
> file:///home/nas/flink/ha//default/blob
> 09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  - 
> Enforcing default ACL for ZK connections
> 09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  - 
> Using '/flink-eur/default' as Zookeeper namespace.
> 09:20:58.539 [main] INFO  
> o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
> 09:20:58.543 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:zookeeper.version=
> 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
> 09:20:58.543 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:host.name =flink-eur-production1
> 09:20:58.543 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.version=1.8.0_131
> 09:20:58.544 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.vendor=Oracle Corporation
> 09:20:58.546 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.home=/opt/jdk/jdk1.8.0_131/jre
> 09:20:58.546 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.class.path=/opt/flink/flink-1.5.0/lib/commons-httpclient-3.1.jar:/opt/flink/flink-1.5.0/lib/flink-metrics-statsd-1.5.0.jar:/opt/flink/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink/flink-1.5.0/lib/fluency-1.8.0.jar:/opt/flink/flink-1.5.0/lib/gcs-connector-latest-hadoop2.jar:/opt/flink/flink-1.5.0/lib/hadoop-openstack-2.7.1.jar:/opt/flink/flink-1.5.0/lib/jackson-annotations-2.8.0.jar:/opt/flink/flink-1.5.0/lib/jackson-core-2.8.10.jar:/opt/flink/flink-1.5.0/lib/jackson-databind-2.8.11.1.jar:/opt/flink/flink-1.5.0/lib/jackson-dataformat-msgpack-0.8.15.jar:/opt/flink/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink/flink-1.5.0/lib/log4j-over-slf4j-1.7.25.jar:/opt/flink/flink-1.5.0/lib/logback-classic-1.2.3.jar:/opt/flink/flink-1.5.0/lib/logback-core-1.2.3.jar:/opt/flink/flink-1.5.0/lib/logback-more-appenders-1.4.2.jar:/opt/flink/flink-1.5.0/lib/msgpack-0.6.12.jar:/opt/flink/flink-1.5.0/lib/msgpack-core-0.8.15.jar:/opt/flink/flink-1.5.0/lib/phi-accural-failure-detector-0.0.4.jar:/opt/flink/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
> 09:20:58.546 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> 09:20:58.546 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.io.tmpdir=/tmp
> 09:20:58.546 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:java.compiler=
> 09:20:58.547 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:os.name =Linux
> 09:20:58.547 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:os.arch=amd64
> 09:20:58.547 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:os.version=4.9.87--std-ipv6-64
> 09:20:58.547 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:user.name =root
> 09:20:58.547 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:user.home=/root
> 09:20:58.547 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client 
> environment:user.dir=/opt/flink/flink-1.5.0/bin
> 09:20:58.548 [main] INFO  
> o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating 
> client connection, connectString=10.1.1.5:2181 
> ,10.1.1.6:2181 ,10.1.1.7:2181 
>  sessionTimeout=6 
> 

Re: Window Stream - Need assistance

2018-07-18 Thread Titus Rakkesh
Thanks Xingcan. I specified as GlobalWindow since I am going to put all the
elements coming with splittedActivationTuple with a 24 hour expiry and then
do operations on that when elements coming with stream "unionReloadsStream"
(bigger set).

On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui  wrote:

> Hi Rakkesh,
>
> The `GlobalWindow` is commonly used for custom window assignment and you
> should specify a `trigger` for it [1].
> If the built-in window (e.g., tumbling window or sliding window) join in
> DataStream API fails to meet the requirements,
> you could try the time-windowed join in Table/SQL API [2].
>
> Hope that helps.
>
> Best,
> Xingcan
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/operators/windows.html#global-windows
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html#joins
>
>
> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh 
> wrote:
>
> Thanks for the reply. I have called "env.execute()". But nothing getting
> printed. I have a doubt whether "implemented function" is correct with my
> "requirement". Please assist.
>
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui  wrote:
>
>> Hi Rakkesh,
>>
>> Did you call `execute()`on your `StreamExecutionEnvironment`?
>>
>> Best,
>> Xingcan
>>
>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh 
>> wrote:
>> >
>> > Dear Friends,
>> >  I have 2 streams of the below data types.
>> >
>> > DataStream> splittedActivationTuple;
>> >
>> > DataStream> unionReloadsStream;
>> >
>> > These streams are getting data from Kafka and getting data in different
>> frequencies. "unionReloadsStream"  will receive more data than
>> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
>> Window of 24 hours and manipulate its "Double" field, if a matching data
>> comes from unionReloadsStream (String field is the common field).
>> >
>> > So I wrote the following method to do this task.
>> >
>> >
>> > public static DataStream>
>> joinActivationsBasedOnReload(
>> > DataStream>
>> activationsStream,
>> > DataStream> unifiedReloadStream) {
>> >
>> > return activationsStream.join(unifiedReloadStream).where(new
>> ActivationStreamSelector())
>> > .equalTo(new ReloadStreamSelector()).window
>> (GlobalWindows.create())
>> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>> > .apply(new JoinFunction> Double>, Tuple2, Tuple3>() {
>> > private static final long serialVersionUID = 1L;
>> > @Override
>> > public Tuple3
>> join(Tuple3 first,
>> > Tuple2 second) {
>> > return new Tuple3> Double>(first.f0, first.f1, first.f2 + second.f1);
>> > }
>> > });
>> > }
>> >
>> >
>> > and calling as,
>> >
>> > DataStream> activationWindowStream =
>> joinActivationsBasedOnReload(splittedActivationTuple,
>> unionReloadsStream);
>> >
>> > activationWindowStream.print();
>> >
>> >
>> > But I couldn't see anything printing.
>> >
>> > I expected "activationWindowStream" to contain the
>> "splittedActivationTuple" (smaller set) data and the Double value
>> accumulated if  unionReloadsStream's incoming elements have a matching
>> "String" field. But that is not happening. Where I am missing?
>> >
>> > Thanks,
>> > Rakkesh
>>
>>
>
>


Re: Flink 1.5 job distribution over cluster nodes

2018-07-18 Thread Vishal Santoshi
I think there is something to be said about making this distribution more
flexible.  A stand alone cluster, still the distribution mechanism for many
a installations suffers horribly with the above approach. A healthy cluster
requires resources wot be used equitably is possible. I have some pipes
that are CPU intense and some not so much. Slots being the primary unit of
parallelism does not differentiate that much between a stress inducing pipe
or other wise and thus having them spread out becomes essential to avoid
skews.

Maybe a simple setting could have forked different approaches to slot
distribution. standalone = true for example.  I have a n node cluster with
2 nodes being heavily stressed ( LA approaching the slot size ) b'coz of
this new setup and that does not seem right. Ot is still a physical node
that that these process run on with local drives and so on.






On Wed, Jul 18, 2018, 7:54 AM scarmeli  wrote:

> Answered in another mailing list
>
> Hi Shachar,
>
> with Flink 1.5 we added resource elasticity. This means that Flink is now
> able to allocate new containers on a cluster management framework like
> Yarn
> or Mesos. Due to these changes (which also apply to the standalone mode),
> Flink no longer reasons about a fixed set of TaskManagers because if
> needed
> it will start new containers (does not work in standalone mode).
> Therefore,
> it is hard for the system to make any decisions about spreading slots
> belonging to a single job out across multiple TMs. It gets even harder
> when
> you consider that some jobs like yours might benefit from such a strategy
> whereas others would benefit from co-locating its slots. It gets even more
> complicated if you want to do scheduling wrt to multiple jobs which the
> system does not have full knowledge about because they are submitted
> sequentially. Therefore, Flink currently assumes that slots requests can
> be
> fulfilled by any TaskManager.
>
> Cheers,
> Till
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Cannot configure akka.ask.timeout

2018-07-18 Thread Vishal Santoshi
Scratch that... that is a different exception

On Wed, Jul 18, 2018, 8:40 AM Vishal Santoshi 
wrote:

> That stumped us too and I am not sure but could you set up web.timeout
> higher then t he default 10s. We had issues with timeouts on job submission
> and were advised to change web.timeout, job submission now being an RPC
> call.. please do let us know if that helps...
>
> On Wed, Jul 18, 2018, 5:11 AM Yun Tang  wrote:
>
>> Hi Lukas
>>
>> From your first two steps' description ("started this in Intellij") and
>> the exception log, I think you run your program locally within Intellij
>> with LocalStreamEnvironment. You can view the configuration related code
>> from org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
>> below:
>>
>> Configuration configuration = new Configuration();
>> configuration.addAll(jobGraph.getJobConfiguration());
>> configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
>>
>> // add (and override) the settings with what the user defined
>> configuration.addAll(this.configuration);
>>
>> if (!configuration.contains(RestOptions.PORT)) {
>>configuration.setInteger(RestOptions.PORT, 0);
>> }
>>
>> int numSlotsPerTaskManager = 
>> configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
>> jobGraph.getMaximumParallelism());
>>
>> MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
>>.setConfiguration(configuration)
>>.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
>>.build();
>>
>> Unluckily, from the above code, I don't think you might be able to set
>> specific akka-timout if you don't change this class (if I'm wrong, please
>> correct me), the easiest way is just to change the ASK_TIMEOUT's default
>> value within org/apache/flink/configuration/AkkaOptions.java from "10 s" to
>> "100 s".
>>
>> Best
>> Yun
>>
>> --
>> *From:* Lukas Kircher 
>> *Sent:* Wednesday, July 18, 2018 14:47
>> *To:* user
>> *Subject:* Re: Cannot configure akka.ask.timeout
>>
>> Hello,
>>
>> does anybody have an idea what is going on? I have not yet found a
>> solution.
>>
>> Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not
>> related to the exception stated below?
>>
>> Could somebody please take a look at this? More details can be found in
>> the message prior to this.
>>
>> *akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
>> after [1 ms]*
>>
>> Best regards,
>> Lukas
>>
>>
>> On 13. Jul 2018, at 12:24, Lukas Kircher  wrote:
>>
>> Hello,
>>
>> I have problems setting configuration parameters for Akka in Flink 1.5.0.
>> When I run a job I get the exception listed below which states that Akka
>> timed out after 1ms. I tried to increase the timeout by following the
>> Flink configuration documentation. Specifically I did the following:
>>
>> 1) Passed a configuration to the Flink execution environment with
>> `akka.ask.timeout` set to a higher value. I started this in Intellij.
>> 2) Passed program arguments via the run configuration in Intellij,
>> e.g. `-Dakka.ask.timeout:100s`
>> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local
>> standalone cluster via start-cluster.sh. The setting is reflected in
>> Flink's web interface.
>>
>> However - despite explicit configuration the default setting seems to be
>> used. The exception below states in each case that akka ask timed out after
>> 1ms.
>>
>> As my problem seems very basic I do not include an SSCCE for now but I
>> can try to build one if this helps figuring out the issue.
>>
>> --
>> *[...]*
>> *Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
>> JobResult.*
>> *[...]*
>> *at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)*
>> *at
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)*
>> *at
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)*
>> *at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)*
>> *at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)*
>> *[...]*
>> *Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
>> after [1 ms]. Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".*
>> *at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)*
>> *at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)*
>> *at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)*
>> *at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)*
>> *at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)*
>> *at
>> 

Re: Cannot configure akka.ask.timeout

2018-07-18 Thread Vishal Santoshi
That stumped us too and I am not sure but could you set up web.timeout
higher then t he default 10s. We had issues with timeouts on job submission
and were advised to change web.timeout, job submission now being an RPC
call.. please do let us know if that helps...

On Wed, Jul 18, 2018, 5:11 AM Yun Tang  wrote:

> Hi Lukas
>
> From your first two steps' description ("started this in Intellij") and
> the exception log, I think you run your program locally within Intellij
> with LocalStreamEnvironment. You can view the configuration related code
> from org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
> below:
>
> Configuration configuration = new Configuration();
> configuration.addAll(jobGraph.getJobConfiguration());
> configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
>
> // add (and override) the settings with what the user defined
> configuration.addAll(this.configuration);
>
> if (!configuration.contains(RestOptions.PORT)) {
>configuration.setInteger(RestOptions.PORT, 0);
> }
>
> int numSlotsPerTaskManager = 
> configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
> jobGraph.getMaximumParallelism());
>
> MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
>.setConfiguration(configuration)
>.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
>.build();
>
> Unluckily, from the above code, I don't think you might be able to set
> specific akka-timout if you don't change this class (if I'm wrong, please
> correct me), the easiest way is just to change the ASK_TIMEOUT's default
> value within org/apache/flink/configuration/AkkaOptions.java from "10 s" to
> "100 s".
>
> Best
> Yun
>
> --
> *From:* Lukas Kircher 
> *Sent:* Wednesday, July 18, 2018 14:47
> *To:* user
> *Subject:* Re: Cannot configure akka.ask.timeout
>
> Hello,
>
> does anybody have an idea what is going on? I have not yet found a
> solution.
>
> Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not
> related to the exception stated below?
>
> Could somebody please take a look at this? More details can be found in
> the message prior to this.
>
> *akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
> after [1 ms]*
>
> Best regards,
> Lukas
>
>
> On 13. Jul 2018, at 12:24, Lukas Kircher  wrote:
>
> Hello,
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0.
> When I run a job I get the exception listed below which states that Akka
> timed out after 1ms. I tried to increase the timeout by following the
> Flink configuration documentation. Specifically I did the following:
>
> 1) Passed a configuration to the Flink execution environment with
> `akka.ask.timeout` set to a higher value. I started this in Intellij.
> 2) Passed program arguments via the run configuration in Intellij,
> e.g. `-Dakka.ask.timeout:100s`
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local
> standalone cluster via start-cluster.sh. The setting is reflected in
> Flink's web interface.
>
> However - despite explicit configuration the default setting seems to be
> used. The exception below states in each case that akka ask timed out after
> 1ms.
>
> As my problem seems very basic I do not include an SSCCE for now but I can
> try to build one if this helps figuring out the issue.
>
> --
> *[...]*
> *Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
> JobResult.*
> *[...]*
> *at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)*
> *at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)*
> *at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)*
> *at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)*
> *at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)*
> *[...]*
> *Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
> after [1 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".*
> *at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)*
> *at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)*
> *at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)*
> *at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)*
> *at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)*
> *at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)*
> *at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)*
> *at
> 

Re: event time and late events - documentation

2018-07-18 Thread vino yang
Hi Tovi,

1. approach 1 : change the watermark let it means the later time, window
trigger early.
2. approach 2 : do not change the watermark, but specify the allow lateness
for the window[1][2], this way can make window accept late element and
re-fire the window after watermark.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#late-elements
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#allowed-lateness

Thanks, vino.

2018-07-16 23:28 GMT+08:00 Elias Levy :

> Tovi,
>
> The document here
> 
> should answer your question.  If it doesn't, please let me know.
>
> On Mon, Jul 16, 2018 at 5:17 AM Sofer, Tovi  wrote:
>
>> Hi group,
>>
>> Can someone please elaborate on the comment at the end of section
>> “Debugging Windows & Event Time”?
>>
>> Didn’t understand it meaning.
>>
>> https://ci.apache.org/projects/flink/flink-docs-
>> master/monitoring/debugging_event_time.html
>>
>> *“Handling Event Time Stragglers*
>>
>> *Approach 1: Watermark stays late (indicated completeness), windows fire
>> early*
>>
>> *Approach 2: Watermark heuristic with maximum lateness, windows accept
>> late data”*
>>
>>
>>
>> Thanks,
>>
>> Tovi
>>
>


Re: FlinkKafkaConsumer configuration to consume from Multiple Kafka Topics

2018-07-18 Thread Andrey Zagrebin
Hi Sagar,

At the moment number of partitions in Kafka source topics and parallelism of 
Flink Kafka source operator are completely independent. Flink will internally 
distribute partitions between a number of source parallel subtasks which you 
configure. In case of dynamic partition or topic discovery while running it 
also happens automatically.

Job or source parallelism can be set e.g. to the total number of Kafka 
partitions over all topics known in advanced, if programmatically then e.g. 
using Kafka client.

Cheers,
Andrey

> On 18 Jul 2018, at 07:54, sagar loke  wrote:
> 
> Hi,
> 
> We have a use case where we are consuming from more than 100s of Kafka 
> Topics. Each topic has different number of partitions. 
> 
> As per the documentation, to parallelize a Kafka Topic, we need to use 
> setParallelism() == number of Kafka Partitions for a topic. 
> 
> But if we are consuming multiple topics in Flink by providing pattern eg. 
> my_topic_* and for each topic if there is different configuration for 
> partitions, 
> 
> then how should we connect all these together so that we can map Kafka 
> Partition to Flink Parallelization correctly and programmatically (so that we 
> don't have to hard code all the topic names and parallelism -- considering we 
> can access kafka topic <-> number of partitions mapping in Flink) ?
> 
> Thanks,



Re: Flink 1.5 job distribution over cluster nodes

2018-07-18 Thread scarmeli
Answered in another mailing list

Hi Shachar, 

with Flink 1.5 we added resource elasticity. This means that Flink is now 
able to allocate new containers on a cluster management framework like Yarn 
or Mesos. Due to these changes (which also apply to the standalone mode), 
Flink no longer reasons about a fixed set of TaskManagers because if needed 
it will start new containers (does not work in standalone mode). Therefore, 
it is hard for the system to make any decisions about spreading slots 
belonging to a single job out across multiple TMs. It gets even harder when 
you consider that some jobs like yours might benefit from such a strategy 
whereas others would benefit from co-locating its slots. It gets even more 
complicated if you want to do scheduling wrt to multiple jobs which the 
system does not have full knowledge about because they are submitted 
sequentially. Therefore, Flink currently assumes that slots requests can be 
fulfilled by any TaskManager. 

Cheers, 
Till 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Keeping only latest row by key?

2018-07-18 Thread Timo Walther

Hi James,

the easiest solution for this bahavior is to use a user-defined 
LAST_VALUE aggregate function as discussed here [1].


I hope this helps.

Regards,
Timo

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html



Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:

Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
It should be possible to implement this behaviour using them.

Cheers,
Andrey

On 17 Jul 2018, at 18:27, Porritt, James > wrote:


In Spark if I want to be able to get a set of unique rows by id, 
using the criteria of keeping the row with the latest timestamp, I 
would do the following:

.withColumn("rn",
F.row_number().over(
Window.partitionBy(‘id’) \
.orderBy(F.col('timestamp').desc())
)
) \
.where(F.col("rn") == 1)
I see Flink has windowing functionality, but I don’t see it has row 
enumeration? How best in that case would I achieve the above?

Thanks,
James.
##
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital 
Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
##






Re: Keeping only latest row by key?

2018-07-18 Thread Andrey Zagrebin
Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
 

It should be possible to implement this behaviour using them.

Cheers,
Andrey

> On 17 Jul 2018, at 18:27, Porritt, James  wrote:
> 
> In Spark if I want to be able to get a set of unique rows by id, using the 
> criteria of keeping the row with the latest timestamp, I would do the 
> following:
>  
> .withColumn("rn",
> F.row_number().over(
> Window.partitionBy(‘id’) \
> .orderBy(F.col('timestamp').desc())
> )
> ) \
> .where(F.col("rn") == 1)
>  
> I see Flink has windowing functionality, but I don’t see it has row 
> enumeration? How best in that case would I achieve the above?
>  
> Thanks,
> James.
> ##
> The information contained in this communication is confidential and
> intended only for the individual(s) named above. If you are not a named
> addressee, please notify the sender immediately and delete this email
> from your system and do not disclose the email or any part of it to any
> person. The views expressed in this email are the views of the author
> and do not necessarily represent the views of Millennium Capital Partners
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
> communications of MCP LLP and its affiliates, including telephone
> communications, may be electronically archived and subject to review
> and/or disclosure to someone other than the recipient. MCP LLP is
> authorized and regulated by the Financial Conduct Authority. Millennium
> Capital Partners LLP is a limited liability partnership registered in
> England & Wales with number OC312897 and with its registered office at
> 50 Berkeley Street, London, W1J 8HD.
> ##



Re: Window Stream - Need assistance

2018-07-18 Thread Xingcan Cui
Hi Rakkesh,

The `GlobalWindow` is commonly used for custom window assignment and you should 
specify a `trigger` for it [1].
If the built-in window (e.g., tumbling window or sliding window) join in 
DataStream API fails to meet the requirements,
you could try the time-windowed join in Table/SQL API [2].

Hope that helps.

Best,
Xingcan 

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins 



> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh  wrote:
> 
> Thanks for the reply. I have called "env.execute()". But nothing getting 
> printed. I have a doubt whether "implemented function" is correct with my 
> "requirement". Please assist. 
> 
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui  > wrote:
> Hi Rakkesh,
> 
> Did you call `execute()`on your `StreamExecutionEnvironment`?
> 
> Best,
> Xingcan 
> 
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh  > > wrote:
> > 
> > Dear Friends,
> >  I have 2 streams of the below data types.
> > 
> > DataStream> splittedActivationTuple;
> > 
> > DataStream> unionReloadsStream;
> > 
> > These streams are getting data from Kafka and getting data in different 
> > frequencies. "unionReloadsStream"  will receive more data than 
> > "splittedActivationTuple". I need to store  "splittedActivationTuple" in a 
> > Window of 24 hours and manipulate its "Double" field, if a matching data 
> > comes from unionReloadsStream (String field is the common field).
> > 
> > So I wrote the following method to do this task.
> > 
> > 
> > public static DataStream> 
> > joinActivationsBasedOnReload(
> > DataStream> activationsStream,
> > DataStream> unifiedReloadStream) {
> > 
> > return activationsStream.join(unifiedReloadStream).where(new 
> > ActivationStreamSelector())
> > .equalTo(new 
> > ReloadStreamSelector()).window(GlobalWindows.create())
> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> > .apply(new JoinFunction, 
> > Tuple2, Tuple3>() {
> > private static final long serialVersionUID = 1L;
> > @Override
> > public Tuple3 
> > join(Tuple3 first,
> > Tuple2 second) {
> > return new Tuple3 > Double>(first.f0, first.f1, first.f2 + second.f1);
> > }
> > });
> > }
> > 
> > 
> > and calling as,
> > 
> > DataStream> activationWindowStream = 
> > joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> > 
> > activationWindowStream.print();
> > 
> > 
> > But I couldn't see anything printing. 
> > 
> > I expected "activationWindowStream" to contain the 
> > "splittedActivationTuple" (smaller set) data and the Double value 
> > accumulated if  unionReloadsStream's incoming elements have a matching 
> > "String" field. But that is not happening. Where I am missing?
> > 
> > Thanks,
> > Rakkesh
> 
> 



Re: Window Stream - Need assistance

2018-07-18 Thread Titus Rakkesh
Thanks for the reply. I have called "env.execute()". But nothing getting
printed. I have a doubt whether "implemented function" is correct with my
"requirement". Please assist.

On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui  wrote:

> Hi Rakkesh,
>
> Did you call `execute()`on your `StreamExecutionEnvironment`?
>
> Best,
> Xingcan
>
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh 
> wrote:
> >
> > Dear Friends,
> >  I have 2 streams of the below data types.
> >
> > DataStream> splittedActivationTuple;
> >
> > DataStream> unionReloadsStream;
> >
> > These streams are getting data from Kafka and getting data in different
> frequencies. "unionReloadsStream"  will receive more data than
> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
> Window of 24 hours and manipulate its "Double" field, if a matching data
> comes from unionReloadsStream (String field is the common field).
> >
> > So I wrote the following method to do this task.
> >
> >
> > public static DataStream>
> joinActivationsBasedOnReload(
> > DataStream>
> activationsStream,
> > DataStream> unifiedReloadStream) {
> >
> > return activationsStream.join(unifiedReloadStream).where(new
> ActivationStreamSelector())
> > .equalTo(new ReloadStreamSelector()).
> window(GlobalWindows.create())
> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> > .apply(new JoinFunction,
> Tuple2, Tuple3>() {
> > private static final long serialVersionUID = 1L;
> > @Override
> > public Tuple3
> join(Tuple3 first,
> > Tuple2 second) {
> > return new Tuple3 Double>(first.f0, first.f1, first.f2 + second.f1);
> > }
> > });
> > }
> >
> >
> > and calling as,
> >
> > DataStream> activationWindowStream =
> joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> >
> > activationWindowStream.print();
> >
> >
> > But I couldn't see anything printing.
> >
> > I expected "activationWindowStream" to contain the
> "splittedActivationTuple" (smaller set) data and the Double value
> accumulated if  unionReloadsStream's incoming elements have a matching
> "String" field. But that is not happening. Where I am missing?
> >
> > Thanks,
> > Rakkesh
>
>


Re: Elasticsearch 6.3.x connector

2018-07-18 Thread Tzu-Li (Gordon) Tai
Hi Miki,

The latest stable version of the Elasticsearch connector, as of Flink 1.5.x, is 
Elasticsearch 5.x.

As for Elasticsearch 6.x, there has been some PRs that has been open for a 
while and have already been discussed quite thoroughly [1] [2].

Till and I have talked about merging these for 1.6 (this normally should not 
occur since we’re already past the feature freeze for 1.6), and we’ve agreed to 
try to do that.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/6043
[2] https://github.com/apache/flink/pull/5374

On 16 July 2018 at 11:25:14 PM, vino yang (yanghua1...@gmail.com) wrote:

Hi miki,

Flink does not provide a connector for ElasticSearch 6 yet. There is this JIRA 
issue to track the development progress [1]. Based on the issues's status, it 
is in progress. 

Flink's documentation about the elasticsearch connector is right, current 
stable version is 5.x. And the maven repository dependency you provided does 
not mean flink depends on elasticsearch 6.3.1, which means the elasticsearch's 
latest version in maven repository.

Thanks. 
vino.

[1] https://issues.apache.org/jira/browse/FLINK-8101

2018-07-16 19:05 GMT+08:00 miki haiat :

HI ,

I just wondered what is to status of  the 6.3.x  elastic connector.
flink-connector-elasticsearch-base_2.11  has elastic 6.3.1 dependencies .

Documentation mention 5.3 as the stable version  elasticsearch.html  

What is the latest stable version of the elasitc connector .


Miki





Re: Window Stream - Need assistance

2018-07-18 Thread Xingcan Cui
Hi Rakkesh,

Did you call `execute()`on your `StreamExecutionEnvironment`?

Best,
Xingcan 

> On Jul 18, 2018, at 5:12 PM, Titus Rakkesh  wrote:
> 
> Dear Friends,
>  I have 2 streams of the below data types.
> 
> DataStream> splittedActivationTuple;
> 
> DataStream> unionReloadsStream;
> 
> These streams are getting data from Kafka and getting data in different 
> frequencies. "unionReloadsStream"  will receive more data than 
> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a 
> Window of 24 hours and manipulate its "Double" field, if a matching data 
> comes from unionReloadsStream (String field is the common field).
> 
> So I wrote the following method to do this task.
> 
> 
> public static DataStream> 
> joinActivationsBasedOnReload(
> DataStream> activationsStream,
> DataStream> unifiedReloadStream) {
> 
> return activationsStream.join(unifiedReloadStream).where(new 
> ActivationStreamSelector())
> .equalTo(new 
> ReloadStreamSelector()).window(GlobalWindows.create())
> .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> .apply(new JoinFunction, 
> Tuple2, Tuple3>() {
> private static final long serialVersionUID = 1L;
> @Override
> public Tuple3 
> join(Tuple3 first,
> Tuple2 second) {
> return new Tuple3(first.f0, 
> first.f1, first.f2 + second.f1);
> }
> });
> }
> 
> 
> and calling as,
> 
> DataStream> activationWindowStream = 
> joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> 
> activationWindowStream.print();
> 
> 
> But I couldn't see anything printing. 
> 
> I expected "activationWindowStream" to contain the "splittedActivationTuple" 
> (smaller set) data and the Double value accumulated if  unionReloadsStream's 
> incoming elements have a matching "String" field. But that is not happening. 
> Where I am missing?
> 
> Thanks,
> Rakkesh



Re: Serialization questions

2018-07-18 Thread Flavio Pompermaier
Hi Minglei,
using the registerTypeWithKryoSerializer with the 3 classes works
(without disableGenericTypes)
but the problem is that I would like to avoid Kryo serialization if this is
useful to speedup the job performance,
and thus I'd like to be able to run all jobs with disableGenericTypes.

Best,
Flavio

On Wed, Jul 18, 2018 at 11:10 AM, zhangminglei <18717838...@163.com> wrote:

> Hi, Flavio
>
>
>- addDefaultKryoSerializer differs from registerTypeWithKryoSerializer
>because addDefaultKryoSerializer use the passed serializer also for
>subclasses of the configured class. Am I right? This is not very clear in
>the method's Javadoc…
>
>
> I think it is not exactly a problem with flink. Instead of a kryo problem.
> For example, addDefaultKryoSerializer corresponding to the
> addDefaultSerializer(int[].class, IntArraySerializer.class) in kryo,
> whereas registerTypeWithKryoSerializer corresponding to the
> register(int.class, new IntSerializer()) in kryo.With register, you
> explicitly assign an id for that type plus serializer. The default
> serializer just tells kryo which serializer to use when this type has to be
> serialized, kryo will then implicitly register the serializer. And the
> advantage of using register would be [1]. when setting
> setRegistrationRequired(true), which is recommended (and will be the
> default in 5.0), you'd have to register every occurring type explicitly.
>
>
>- how to avoid that exception?
>
> You can try below and do not make disableGenericTypes and see what happens.
>
> env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class);
> env.registerTypeWithKryoSerializer(EntitonAtom.class,
> TBaseSerializer.class);
> env.registerTypeWithKryoSerializer(EntitonQuad.class,
> TBaseSerializer.class);
>
>
> [1] https://github.com/EsotericSoftware/kryo/blob/
> master/README.md#registration
>
> Cheers
> Minglei
>
>
>
>
>
>
> 在 2018年7月17日,下午9:00,Flavio Pompermaier  写道:
>
> Hi to all,
> I was trying to check whether our jobs are properly typed or not.
> I've started disabling generic types[1] in order to discover untyped
> transformations and so I added the proper returns() to operators.
>
> Unfortunately there are jobs where we serialize Thrift and DateTime
> objects, so I need to properly configure the serializers in the
> ExecutionEnvironment:
>
> env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class);
> env.getConfig().addDefaultKryoSerializer(EntitonAtom.class,
> TBaseSerializer.class);
> env.getConfig().addDefaultKryoSerializer(EntitonQuad.class,
> TBaseSerializer.class);
>
> Those jobs don't work when I disable generic types and I get the following
> exception:
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been
> disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a
> generic type.
>
>  I have a couple of questions:
>
>- addDefaultKryoSerializer differs from registerTypeWithKryoSerializer
>because addDefaultKryoSerializer use the passed serializer also for
>subclasses of the configured class. Am I right? This is not very clear in
>the method's Javadoc...
>- how to avoid that exception?
>
> Best,
> Flavio
>
> [1] env.getConfig().disableGenericTypes();
>
>
>


Window Stream - Need assistance

2018-07-18 Thread Titus Rakkesh
Dear Friends,
 I have 2 streams of the below data types.

DataStream> splittedActivationTuple;

DataStream> unionReloadsStream;

These streams are getting data from Kafka and getting data in different
frequencies. "unionReloadsStream"  will receive more data than
"splittedActivationTuple". I need to store  "splittedActivationTuple" in a
Window of 24 hours and manipulate its "Double" field, if a matching data
comes from unionReloadsStream (String field is the common field).

So I wrote the following method to do this task.


public static DataStream>
joinActivationsBasedOnReload(
DataStream> activationsStream,
DataStream> unifiedReloadStream) {

return activationsStream.join(unifiedReloadStream).where(new
ActivationStreamSelector())
.equalTo(new
ReloadStreamSelector()).window(GlobalWindows.create())
.evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
.apply(new JoinFunction,
Tuple2, Tuple3>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple3
join(Tuple3 first,
Tuple2 second) {
return new Tuple3(first.f0, first.f1, first.f2 + second.f1);
}
});
}


and calling as,

DataStream> activationWindowStream =
joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);

activationWindowStream.print();


But I couldn't see anything printing.

I expected "activationWindowStream" to contain the
"splittedActivationTuple" (smaller set) data and the Double value
accumulated if  unionReloadsStream's incoming elements have a matching
"String" field. But that is not happening. Where I am missing?

Thanks,
Rakkesh


Re: Cannot configure akka.ask.timeout

2018-07-18 Thread Yun Tang
Hi Lukas

>From your first two steps' description ("started this in Intellij") and the 
>exception log, I think you run your program locally within Intellij with 
>LocalStreamEnvironment. You can view the configuration related code from 
>org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java below:


Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);

// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);

if (!configuration.contains(RestOptions.PORT)) {
   configuration.setInteger(RestOptions.PORT, 0);
}

int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism());

MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
   .setConfiguration(configuration)
   .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
   .build();

Unluckily, from the above code, I don't think you might be able to set specific 
akka-timout if you don't change this class (if I'm wrong, please correct me), 
the easiest way is just to change the ASK_TIMEOUT's default value within 
org/apache/flink/configuration/AkkaOptions.java from "10 s" to "100 s".

Best
Yun


From: Lukas Kircher 
Sent: Wednesday, July 18, 2018 14:47
To: user
Subject: Re: Cannot configure akka.ask.timeout

Hello,

does anybody have an idea what is going on? I have not yet found a solution.

Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related 
to the exception stated below?

Could somebody please take a look at this? More details can be found in the 
message prior to this.

akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
 after [1 ms]

Best regards,
Lukas


On 13. Jul 2018, at 12:24, Lukas Kircher 
mailto:lukaskirch...@gmail.com>> wrote:

Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When 
I run a job I get the exception listed below which states that Akka timed out 
after 1ms. I tried to increase the timeout by following the Flink 
configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with 
`akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. 
`-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local 
standalone cluster via start-cluster.sh. The setting is reflected in Flink's 
web interface.

However - despite explicit configuration the default setting seems to be used. 
The exception below states in each case that akka ask timed out after 1ms.

As my problem seems very basic I do not include an SSCCE for now but I can try 
to build one if this helps figuring out the issue.

--
[...]
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
JobResult.
[...]
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
 after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
--


Best regards and thanks for your help,
Lukas






Re: Serialization questions

2018-07-18 Thread zhangminglei
Hi, Flavio

> addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because 
> addDefaultKryoSerializer use the passed serializer also for subclasses of the 
> configured class. Am I right? This is not very clear in the method's Javadoc…

I think it is not exactly a problem with flink. Instead of a kryo problem. For 
example, addDefaultKryoSerializer corresponding to the 
addDefaultSerializer(int[].class, IntArraySerializer.class) in kryo, whereas 
registerTypeWithKryoSerializer corresponding to the register(int.class, new 
IntSerializer()) in kryo.With register, you explicitly assign an id for that 
type plus serializer. The default serializer just tells kryo which serializer 
to use when this type has to be serialized, kryo will then implicitly register 
the serializer. And the advantage of using register would be [1]. when setting 
setRegistrationRequired(true), which is recommended (and will be the default in 
5.0), you'd have to register every occurring type explicitly.

> how to avoid that exception?
You can try below and do not make disableGenericTypes and see what happens.

env.registerTypeWithKryoSerializer(DateTime.class, 
JodaDateTimeSerializer.class);
env.registerTypeWithKryoSerializer(EntitonAtom.class, TBaseSerializer.class);
env.registerTypeWithKryoSerializer(EntitonQuad.class, TBaseSerializer.class);


[1] https://github.com/EsotericSoftware/kryo/blob/master/README.md#registration 
 

Cheers
Minglei








> 在 2018年7月17日,下午9:00,Flavio Pompermaier  写道:
> 
> Hi to all,
> I was trying to check whether our jobs are properly typed or not.
> I've started disabling generic types[1] in order to discover untyped 
> transformations and so I added the proper returns() to operators.
> 
> Unfortunately there are jobs where we serialize Thrift and DateTime objects, 
> so I need to properly configure the serializers in the ExecutionEnvironment:
> 
> env.registerTypeWithKryoSerializer(DateTime.class, 
> JodaDateTimeSerializer.class);
> env.getConfig().addDefaultKryoSerializer(EntitonAtom.class, 
> TBaseSerializer.class);
> env.getConfig().addDefaultKryoSerializer(EntitonQuad.class, 
> TBaseSerializer.class);
> 
> Those jobs don't work when I disable generic types and I get the following 
> exception:
> 
> Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
> types have been 
> disabled in the ExecutionConfig and type xxx.EntitonAtom is treated as a 
> generic type.
> 
>  I have a couple of questions:
> addDefaultKryoSerializer differs from registerTypeWithKryoSerializer because 
> addDefaultKryoSerializer use the passed serializer also for subclasses of the 
> configured class. Am I right? This is not very clear in the method's 
> Javadoc...
> how to avoid that exception?
> Best,
> Flavio
> 
> [1] env.getConfig().disableGenericTypes();



Flink 1.5 job distribution over cluster nodes

2018-07-18 Thread Shachar Carmeli
 Hi,

We have 4 jobs with parallelism 3 that are running over 3 task manager with
4 slots per each , each task manager runs on a different VM ,

On Flink 1.3.2 the jobs were evenly distributed per node each job took one
task slot of each task manager .

 After upgrading to flink 1.5 , each job is running on a single task
manager (with a carry over to another if there are no slots left)

The jobs are not evenly by load which cause some task managers  to consume
more resources (CPU/memory) than other task managers.

Is there a way to return to an even distribution?

Thanks,

Shachar


Flink resource manager unable to connect to mesos after restart

2018-07-18 Thread Renjie Liu
Hi, all:

I'm testing flink 1.5.0 and find that flink mesos resource manager unable
to connect to mesos after restart. Have you seen this happenen?
-- 
Liu, Renjie
Software Engineer, MVAD


Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-18 Thread Gerard Garcia
Hi vino,

Seems that jobs id stay in /jobgraphs when we cancel them manually. For
example, after cancelling the job with id 75e16686cb4fe0d33ead8e29af131d09
the entry is still in zookeeper's path /flink/default/jobgraphs, but the
job disappeared from /home/nas/flink/ha/default/blob/.

That is the client log:

09:20:58.492 [main] INFO  org.apache.flink.client.cli.CliFrontend  -
Cancelling job 75e16686cb4fe0d33ead8e29af131d09.
09:20:58.503 [main] INFO
org.apache.flink.runtime.blob.FileSystemBlobStore  - Creating highly
available BLOB storage directory at file:///home/nas/flink/ha//default/blob
09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  -
Enforcing default ACL for ZK connections
09:20:58.505 [main] INFO  org.apache.flink.runtime.util.ZooKeeperUtils  -
Using '/flink-eur/default' as Zookeeper namespace.
09:20:58.539 [main] INFO
o.a.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - Starting
09:20:58.543 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:zookeeper.version=
3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13
GMT
09:20:58.543 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:host.name=flink-eur-production1
09:20:58.543 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.version=1.8.0_131
09:20:58.544 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.vendor=Oracle Corporation
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.home=/opt/jdk/jdk1.8.0_131/jre
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.class.path=/opt/flink/flink-1.5.0/lib/commons-httpclient-3.1.jar:/opt/flink/flink-1.5.0/lib/flink-metrics-statsd-1.5.0.jar:/opt/flink/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink/flink-1.5.0/lib/fluency-1.8.0.jar:/opt/flink/flink-1.5.0/lib/gcs-connector-latest-hadoop2.jar:/opt/flink/flink-1.5.0/lib/hadoop-openstack-2.7.1.jar:/opt/flink/flink-1.5.0/lib/jackson-annotations-2.8.0.jar:/opt/flink/flink-1.5.0/lib/jackson-core-2.8.10.jar:/opt/flink/flink-1.5.0/lib/jackson-databind-2.8.11.1.jar:/opt/flink/flink-1.5.0/lib/jackson-dataformat-msgpack-0.8.15.jar:/opt/flink/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink/flink-1.5.0/lib/log4j-over-slf4j-1.7.25.jar:/opt/flink/flink-1.5.0/lib/logback-classic-1.2.3.jar:/opt/flink/flink-1.5.0/lib/logback-core-1.2.3.jar:/opt/flink/flink-1.5.0/lib/logback-more-appenders-1.4.2.jar:/opt/flink/flink-1.5.0/lib/msgpack-0.6.12.jar:/opt/flink/flink-1.5.0/lib/msgpack-core-0.8.15.jar:/opt/flink/flink-1.5.0/lib/phi-accural-failure-detector-0.0.4.jar:/opt/flink/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.io.tmpdir=/tmp
09:20:58.546 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:java.compiler=
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:os.name=Linux
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:os.arch=amd64
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:os.version=4.9.87--std-ipv6-64
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:user.name=root
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:user.home=/root
09:20:58.547 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client
environment:user.dir=/opt/flink/flink-1.5.0/bin
09:20:58.548 [main] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating
client connection, connectString=10.1.1.5:2181,10.1.1.6:2181,10.1.1.7:2181
sessionTimeout=6
watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@4a003cbe
09:20:58.555 [main-SendThread(10.1.1.5:2181)] WARN
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
configuration failed: javax.security.auth.login.LoginException: No JAAS
configuration section named 'Client' was found in specified JAAS
configuration file: '/tmp/jaas-9143038863636945274.conf'. Will continue
connection to Zookeeper server without SASL authentication, if Zookeeper
server allows it.
09:20:58.556 [main-SendThread(10.1.1.5:2181)] INFO
o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening
socket connection to server 10.1.1.5/10.1.1.5:2181
09:20:58.556 [main-EventThread] ERROR

Re: Flink CLI properties with HA

2018-07-18 Thread Sampath Bhat
Vino, I'm not getting any error but my suspicion was that if I dont specify
this `high-availability.storageDir` property in flink CLI side then the CLI
will not be able to submit job to flink cluster(HA enabled). But if provide
this property in CLI side the job submission will be successful even though
the CLI cannot access the path mentioned in `high-availability.storageDir`.
So I wanted to understand the underlying implementation.

Till, Thank you for the reply. It clarified my doubt.

On Tue, Jul 17, 2018 at 6:03 PM, Till Rohrmann  wrote:

> Hi Sampath,
>
> technically the client does not need to know the
> `high-availability.storageDir` to submit a job. However, due to how we
> construct the ZooKeeperHaServices it is still needed. The reason behind
> this is that we use the same services for the server and the client. Thus,
> the implementation needs to know the storageDir in both cases. The way it
> should be done is to split the HighAvailabilityServices up into client and
> server services. The former would then not depend on
> `high-availability.storageDir`.
>
> Cheers,
> Till
>
> On Tue, Jul 17, 2018 at 1:31 PM vino yang  wrote:
>
>> Hi Sampath,
>>
>> It seems Flink CLI for standalone would not access
>> *high-availability.storageDir.*
>>
>> What's the exception stack trace in your environment?
>>
>> Thanks, vino.
>>
>> 2018-07-17 15:08 GMT+08:00 Sampath Bhat :
>>
>>> Hi vino
>>>
>>> Should the flink CLI have access to the path mentioned in
>>> *high-availability.storageDir*?
>>> If my flink cluster is on set of machines and i submit my job from flink
>>> CLI from another independent machine by giving necessary details will the
>>> CLI try to access *high-availability.storageDir *path?
>>>
>>> I'm aware of the fact that flink client will connect to zookeeper to get
>>> leader address and necessary information for job submission but my
>>> confusion is with *high-availability.storageDir* and its necessity in
>>> flink CLI configuration.
>>>
>>> On Mon, Jul 16, 2018 at 2:44 PM, vino yang 
>>> wrote:
>>>
 Hi Sampath,

 Flink CLI need to retrieve the JobManager leader address, so it need
 to access the HA specific configuration. Because if based on Zookeeper to
 implement the HA, the leader address information will fetch from Zookeeper.

 The main use of config item *high-availability.storageDir* is storage
 (Job graph, checkpoint and so on). Actually, the real data is stored under
 this path which used to recover purpose, zookeeper just store a state
 handle.

 ---
 Thanks.
 vino.


 2018-07-16 15:28 GMT+08:00 Sampath Bhat :

>
> -- Forwarded message --
> From: Sampath Bhat 
> Date: Fri, Jul 13, 2018 at 3:18 PM
> Subject: Flink CLI properties with HA
> To: user 
>
>
> Hello
>
> When HA is enabled in the flink cluster and if I've to submit job via
> flink CLI then in the flink-conf.yaml of flink CLI should contain this
> properties -
> high-availability: zookeeper
> high-availability.cluster-id: flink
> high-availability.zookeeper.path.root: flink
> high-availability.storageDir: 
> high-availability.zookeeper.quorum: 
>
> What is the need of high-availability.storageDir for flink CLI. Does
> this mean that even flink client should be able to access the mentioned
> path or is it some check being done on the property name?
>
> Without these properties flink cli will not be able to submit job to
> flink cluster when HA is enabled.
>
>

>>>
>>


Why data didn't enter the time window in EventTime mode

2018-07-18 Thread Soheil Pourbafrani
Hi,

In a datastream processing problem, the source generated data every 8
millisecond and timestamp is a field of the data. In default Flink time
behavior data enter the time window but when I set Flink time to EventTime
it will output nothing! Here is the code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

SingleOutputStreamOperator> res =
aggregatedTuple
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor>(Time.milliseconds(8)) {

@Override
public long extractTimestamp(Tuple3 element) {
return element.f1 ;
}
}).keyBy(1).timeWindow(Time.milliseconds(8))
.allowedLateness(Time.milliseconds(3))
.sideOutputLateData(lateOutputTag)
.reduce(processing...);
DataStream> lateData =
res.getSideOutput(lateOutputTag);
res.print();

What is the problem with my code?
According to the Flink documents, my understanding about EventTime is that
for example in case of time window when a new data received it start a new
(logical window) based on new data event timestamp and wait 8 milliseconds
(according to my code) to see if any other data with the same key received
or not and after 8 millisecond (from timestamp of the first element of the
window) it will be triggered. Since data source generated data in a
constant periodic interval, I set a watermarck of  8, too. Is my
understanding about Flink window in EventTime correct?


Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-18 Thread vino yang
Hi Gerard,

>From you provide information, you mean the path in Zookeeper "/jobgraphs"
exists more jobs than you submitted?
And can not be restarted because blob files can not be find?

Can you provide more details, about the stack trace, log and which version
of Flink? Normally, the jobgraph can not be added to Zookeeper except
submit job manually.

Thanks, vino.

2018-07-16 21:19 GMT+08:00 gerardg :

> Hi,
>
> Our deployment consists of a standalone HA cluster of 8 machines with an
> external Zookeeper cluster. We have observed several times that when a
> jobmanager fails and a new one is elected, the new one tries to restart
> more jobs than the ones that were running and since it can't find some
> files, it fails and gets stuck in a restart loop. That is the error that we
> see in the logs:
>
>
>
> These are the contents of /home/nas/flink/ha/default/blob/:
>
>
>
> We've checked zookeeper and there are actually a lot of jobgraphs in
> /flink/default/jobgraphs
>
>
>
> There were only three jobs running so neither zookeeper nor the flink 'ha'
> folder seems to have the correct number of jobgraphs stored.
>
> The only way we have to solve this is to remove everything at path /flink
> in
> zookeeper and the 'ha' flink folder and restart the jobs manually.
>
> I'll try to monitor if some action (e.g. we have been canceling and
> restoring jobs from savepoints quite often lately) leaves an entry in
> zookeepers path /flink/default/jobgraphs of a job that is not running but
> maybe someone can't point us to some configuration problem that could cause
> this behavior.
>
> Thanks,
>
> Gerard
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Cannot configure akka.ask.timeout

2018-07-18 Thread Lukas Kircher
Hello,

does anybody have an idea what is going on? I have not yet found a solution.

Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related 
to the exception stated below?

Could somebody please take a look at this? More details can be found in the 
message prior to this.

akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583
 ]] 
after [1 ms]

Best regards,
Lukas


> On 13. Jul 2018, at 12:24, Lukas Kircher  wrote:
> 
> Hello,
> 
> I have problems setting configuration parameters for Akka in Flink 1.5.0. 
> When I run a job I get the exception listed below which states that Akka 
> timed out after 1ms. I tried to increase the timeout by following the 
> Flink configuration documentation. Specifically I did the following:
> 
> 1) Passed a configuration to the Flink execution environment with 
> `akka.ask.timeout` set to a higher value. I started this in Intellij.
> 2) Passed program arguments via the run configuration in Intellij, e.g. 
> `-Dakka.ask.timeout:100s`
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local 
> standalone cluster via start-cluster.sh. The setting is reflected in Flink's 
> web interface.
> 
> However - despite explicit configuration the default setting seems to be 
> used. The exception below states in each case that akka ask timed out after 
> 1ms.
> 
> As my problem seems very basic I do not include an SSCCE for now but I can 
> try to build one if this helps figuring out the issue.
> 
> --
> [...]
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
> JobResult.
> [...]
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
>   at 
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
>   at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> [...]
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583
>  
> ]]
>  after [1 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>   at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>   at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>   at java.lang.Thread.run(Thread.java:745)
> [...]
> --
> 
> 
> Best regards and thanks for your help,
> Lukas
> 
> 
> 



Re: FlinkCEP and scientific papers ?

2018-07-18 Thread Till Rohrmann
You are right Vino,

the initial implementation was based on the above mentioned paper.

Cheers,
Till

On Tue, Jul 17, 2018 at 5:34 PM vino yang  wrote:

> Hi Esa,
>
> AFAIK, the earlier Flink CEP refers to the Paper 《Efficient Pattern
> Matching over Event Streams》[1]. Flink absorbed  two major idea from this
> paper:
>
> 1. NFA-b model on event stream
> 2. a shared versioned match buffer which is a optimized data structure
>
> To Till and Chesnay:
>
> Did I missed anything when as time goes on and the development of Flink?
> If yes, please give your additional remarks.
>
> [1]: https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
>
> Thanks, vino.
>
> 2018-07-17 22:01 GMT+08:00 Esa Heikkinen :
>
>> Hi
>>
>>
>>
>> I don’t know this the correct forum to ask, but are there exist some good
>> scientific papers about FlinkCEP (Complex Event Processing) ?
>>
>>
>>
>> I know Flink is based to Stratosphere, but how is it FlinkCEP ?
>>
>>
>>
>> BR Esa
>>
>
>