AsyncFunction used twice with Asyncdatastream.unorderedWait - 2nd function's asyncInvoke not getting called

2018-07-26 Thread Vijay Balakrishnan
Hi,

I have 2 AsyncFunctions SampleCopyAsyncFunction and SampleSinkAsyncFunction
called with AsyncDataStream.unorderedWait. The 1st
AsyncDataStream.unorderedWait’s
SampleCopyAsyncFunction .asyncInvoke gets called properly but the 2nd
SampleSinkAsyncFunction.asyncInvoke never gets called(though open and close
functions are called). Is there any way for me to have the 2nd asyncInvoke
get called ? I have an Executors.newFixedThreadPool(..) that I use within
each AsyncFunction.




TIA





Here is the code:



AsyncFunction cameraWithCubeAsyncFunction =

new SampleCopyAsyncFunction(shutdownWaitTS, inputFile,
options, nThreads);

DataStream cameraWithCubeDataStreamAsync =

AsyncDataStream.unorderedWait(keyedByCamCameraStream,
cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

.setParallelism(parallelCamTasks);//.startNewChain()

DataStream cameraWithCubeDataStream =
cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) ->
cameraWithCube.cameraKey != null ?

cameraWithCube.cameraKey.getTs() : new Object());

String uuid = UUID.randomUUID().toString();

DataStream>
enrichedCameraFeed = inputMetadataDataStream

.connect(cameraWithCubeDataStream)

.flatMap(new SyncLatchFunction(outputFile, outputPath,
uuid))

.uid("connect2Streams")

.setParallelism(1);

AsyncFunction,
Tuple2> cubeSinkAsyncFunction =

new SampleSinkAsyncFunction(shutdownWaitTS, outputPath,
options, nThreads, uuid);

DataStream>
enrichedCameraFeedSinkAsync =

AsyncDataStream.unorderedWait(enrichedCameraFeed,
cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

.setParallelism(parallelCubeTasks)

.uid("Read-Image-Async");//ç== asyncInvoke never
gets called for 2nd AsyncFunction

DataStream>
enrichedCameraFeedSinkAsyncDataStream =
enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey !=
null ?

tuple2.f0.inputMetadataKey.getTs() : new Object());

//enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t work

enrichedCameraFeedSinkAsyncDataStream.addSink(new
CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS

.setParallelism(parallelCubeTasks)

.uid("Cube-Sink");


Re: Flink resource manager unable to connect to mesos after restart

2018-07-26 Thread Renjie Liu
OK, I'll close 7470

On Thu, Jul 26, 2018 at 11:25 PM Gary Yao  wrote:

> Hi,
>
> Sorry for the late reply. I have seen that you debugged this already and
> created FLINK-9936. Thank you for looking into the issue. I think your
> conclusions are correct. I just wanted to note that there is an even older
> ticket describing the same problem:
>
> https://issues.apache.org/jira/browse/FLINK-7470
>
> One of them should be closed.
>
> Best,
> Gary
>
>
> On Thu, Jul 19, 2018 at 10:45 AM, Renjie Liu 
> wrote:
>
>> Attached is job manager's log.
>>
>>
>> On Thu, Jul 19, 2018 at 4:38 PM Renjie Liu 
>> wrote:
>>
>>> Hi, Gary:
>>>
>>> It can be reproduced stablely, just need to kill job manager and restart
>>> it.
>>>
>>> Attached is jobmanager's log, but I don't find anyting valuable since it
>>> just keep reporting unable to connect to mesos master.
>>>
>>> On Thu, Jul 19, 2018 at 4:55 AM Gary Yao  wrote:
>>>
 Hi,

 If you are able to re-produce this reliably, can you post the
 jobmanager logs?

 Best,
 Gary
 On Wed, Jul 18, 2018 at 10:33 AM, Renjie Liu 
 wrote:

> Hi, all:
>
> I'm testing flink 1.5.0 and find that flink mesos resource manager
> unable to connect to mesos after restart. Have you seen this happenen?
> --
> Liu, Renjie
> Software Engineer, MVAD
>
 --
>>> Liu, Renjie
>>> Software Engineer, MVAD
>>>
>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
>
> --
Liu, Renjie
Software Engineer, MVAD


Committing Kafka Transactions during Savepoint

2018-07-26 Thread Scott Kidder
I recently began using the exactly-once processing semantic with the Kafka
0.11 producer in Flink 1.4.2. It's been working great!

Are Kafka transactions committed when creating a Flink savepoint? How does
this affect the recovery behavior in Flink if, before the completion of the
next checkpoint, the application is restarted and restores from a
checkpoint taken before the savepoint? It seems like this might lead to the
Kafka producer writing a message multiple times with different committed
Kafka transactions.

--
Scott Kidder


Flink Cluster and Pipeline Version Compatibility?

2018-07-26 Thread jlist9
I was trying to find some comparability tables between various versions of
Flink clusters and pipeline jars but haven't run into any so far. Is it so
that the pipeline jars must be build with the same version of the cluster
they'll be running on? Or there are some backward comparability? If it's
already documented/discussed somewhere a link would be appreciated.

Jack


Re: Flink resource manager unable to connect to mesos after restart

2018-07-26 Thread Gary Yao
Hi,

Sorry for the late reply. I have seen that you debugged this already and
created FLINK-9936. Thank you for looking into the issue. I think your
conclusions are correct. I just wanted to note that there is an even older
ticket describing the same problem:

https://issues.apache.org/jira/browse/FLINK-7470

One of them should be closed.

Best,
Gary

On Thu, Jul 19, 2018 at 10:45 AM, Renjie Liu 
wrote:

> Attached is job manager's log.
>
>
> On Thu, Jul 19, 2018 at 4:38 PM Renjie Liu 
> wrote:
>
>> Hi, Gary:
>>
>> It can be reproduced stablely, just need to kill job manager and restart
>> it.
>>
>> Attached is jobmanager's log, but I don't find anyting valuable since it
>> just keep reporting unable to connect to mesos master.
>>
>> On Thu, Jul 19, 2018 at 4:55 AM Gary Yao  wrote:
>>
>>> Hi,
>>>
>>> If you are able to re-produce this reliably, can you post the jobmanager
>>> logs?
>>>
>>> Best,
>>> Gary
>>> On Wed, Jul 18, 2018 at 10:33 AM, Renjie Liu 
>>> wrote:
>>>
 Hi, all:

 I'm testing flink 1.5.0 and find that flink mesos resource manager
 unable to connect to mesos after restart. Have you seen this happenen?
 --
 Liu, Renjie
 Software Engineer, MVAD

>>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: Questions on Unbounded number of keys

2018-07-26 Thread Andrey Zagrebin
Hi Chang Liu,

The unbounded nature of the stream keyed or not should not lead to out of 
memory. 

Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 

The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 

If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, it usually means that your pipeline is under 
provisioned. 

The only accumulated data comes from state (windows, user state etc), so if you 
control its memory consumption, as Till described, there should be no other 
source of out of memory.

Cheers,
Andrey

> On 25 Jul 2018, at 19:06, Chang Liu  wrote:
> 
> Hi Till,
> 
> Thanks for your reply. But I think maybe I did not make my question clear. My 
> question is not about whether the States within each keyed operator instances 
> will run out of memory. My question is about, whether the unlimited keyed 
> operator instances themselves will run out of memory.
> 
> So to reply to your answers, no matter using different State backends or 
> regularly cleaning up the States (which is exactly what I am doing), it does 
> not concern the number of keyed operator instances.
> 
> I would like to know:
> Will the number of keyed operator instances (Java objects?) grow unbounded? 
> If so, will they run out of memory? This is not actually related to the 
> memory used by the keyed Stated inside.
> If not, then how Flink is managing this multiple keyed operator instances?
> 
> I think this needs more knowledge about how Flink works internally to 
> understand how keyed operator instances are created, maintained and 
> destroyed. That’s why I would like your help understanding this.
> 
> Many Thanks.
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 24 Jul 2018, at 14:31, Till Rohrmann > > wrote:
>> 
>> Hi Chang Liu,
>> 
>> if you are dealing with an unlimited number of keys and keep state around 
>> for every key, then your state size will keep growing with the number of 
>> keys. If you are using the FileStateBackend which keeps state in memory, you 
>> will eventually run into an OutOfMemoryException. One way to solve/mitigate 
>> this problem is to use the RocksDBStateBackend which can go out of core.
>> 
>> Alternatively, you would need to clean up your state before you run out of 
>> memory. One way to do this is to register for every key a timer which clears 
>> the state. But this only works if you don't amass too much state data before 
>> the timer is triggered. If you wish this solution is some kind of a poor 
>> man's state TTL. The Flink community is currently developing a proper 
>> implementation of it which does not rely on additional timers (which 
>> increases the state footprint) [1].
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-9510 
>> 
>> 
>> Cheers,
>> Till
>> 
>> On Tue, Jul 24, 2018 at 10:11 AM Chang Liu > > wrote:
>> Dear All,
>> 
>> I have questions regarding the keys. In general, the questions are:
>> what happens if I am doing keyBy based on unlimited number of keys? How 
>> Flink is managing each KeyedStream under the hood? Will I get memory 
>> overflow, for example, if every KeyStream associated with a specific key is 
>> taking certain amount of memory?
>> BTW, I think it is fare to say that, I have to clear my KeyedState so that 
>> the memory used by these State are cleaned up regularly. But still, I am 
>> wondering, even though I am regularly cleaning up State memory, what 
>> happened to memory used by the KeyedStream itself, if there is? And will 
>> they be exploding?
>> 
>> Let me give an example for understanding it clearly.  Let’s say we have a
>> 
>>  val requestStream: DataStream[HttpRequest]
>> 
>> which is a stream of HTTP requests. And by using the session ID as the key, 
>> we can obtain a KeyedStream per single session, as following:
>> 
>> val streamPerSession: KeyedStream[HttpRequest] = 
>> requestStream.keyBy(_.sessionId)
>> 
>> However, the session IDs are actually a hashcode generated randomly by the 
>> Web service/application, so that means, the number of sessions are unlimited 
>> (which is reasonable, because every time a user open the application or 
>> login, he/she will get a new unique session). 
>> 
>> Then, the question is: will Flink eventually run out of memory because the 
>> number of sessions are unlimited (and because we are keying by the session 
>> ID)?
>> If so, how can we properly manage this situation?
>> If not, could you help me understand WHY?
>> Let’s also assume that, we are regularly clearing the KeyedState, so the 
>> memory used by the 

Re: Checkpointing not happening in Standalone HA mode

2018-07-26 Thread vino yang
Hi Vinay:

Did you call specific config API refer to this documentation[1];

Can you share your job program and JM Log? Or the JM log contains the log
message like this pattern "Triggering checkpoint {} @ {} for job {}."?

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing

Thanks, vino.

2018-07-25 19:43 GMT+08:00 Chesnay Schepler :

> Can you provide us with the job code?
>
> I assume that checkpointing runs properly if you submit the same job to a
> normal cluster?
>
>
> On 25.07.2018 13:15, Vinay Patil wrote:
>
> No error in the logs. That is why I am not able to understand why
> checkpoints are not getting triggered.
>
> Regards,
> Vinay Patil
>
>
> On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil 
> wrote:
>
>> Hi Chesnay,
>>
>> No error in the logs. That is why I am not able to understand why
>> checkpoints are getting triggered.
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
>> wrote:
>>
>>> Please check the job- and taskmanager logs for anything suspicious.
>>>
>>> On 25.07.2018 12:33, Vinay Patil wrote:
>>>
>>> Hi,
>>>
>>> I am starting the cluster using bootstrap application where in I am
>>> calling Job Manager and Task Manager main class to form the cluster. The HA
>>> cluster is formed correctly and I am able to submit jobs to this cluster
>>> using RemoteExecutionEnvironment but when I enable checkpointing in code I
>>> do not see any checkpoints triggered on Flink UI.
>>>
>>> Am I missing any configurations to be set for the
>>> RemoteExecutionEnvironment for checkpointing to work.
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>>
>


How to connect more than 2 hetrogenous Streams!!

2018-07-26 Thread Puneet Kinra
Hi

Is there a way to connect more than 2 streams with different stream schema

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: override jvm params

2018-07-26 Thread vino yang
Hi Cussac,

Flink on Yarn support dynamic properties. Can you try this :
-yD=?

The implementation is here[1][2].

[1]:
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L151
[2]:
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L198

Thanks, vino.


2018-07-25 20:27 GMT+08:00 Cussac, Franck :

> Hi Hequn,
>
>
>
> Thanks for your answer. I just tested and it doesn’t work.
>
>
>
> I’m using PureConfig to parse my conf files. With java I can override any
> argument using –D= syntax. How can I do same with flink in
> yarn mode ?
>
>
>
> Franck.
>
>
>
>
>
> *De :* Hequn Cheng [mailto:chenghe...@gmail.com]
> *Envoyé :* mercredi 25 juillet 2018 14:04
> *À :* Cussac, Franck 
> *Cc :* user 
> *Objet :* Re: override jvm params
>
>
>
> Hi Cussac,
>
> If I understand correctly, you want to pass rules.consumer.topic=test
> and rules.consumer.topic=test to flink jvm.
>
> I think you can try:
>
> flink run -m $HOSTPORT -yD rules.consumer.topic=test
> -yD rules.consumer.topic=test
>
>
>
> Hope this helps.
>
> Hequn
>
>
>
> On Wed, Jul 25, 2018 at 3:26 PM, Cussac, Franck <
> franck.cus...@ext.bleckwen.ai> wrote:
>
> Hi,
>
>
>
> Following the documentation I want to use –yD option to override some
> params in my conf like this :
>
>
>
> flink run -m $HOSTPORT -yD 
> "env.java.opts.taskmanager=-Drules.consumer.topic=test"
> -yD "env.java.opts.jobmanager=-Drules.consumer.topic=test" myjar mymain
>
>
>
> but it is just ignored. Nothing happend. But if I run with java on my IDE
> and :
>
> -Drules.consumer.topic=test
>
> in JVM’s parameter it works eprfectly.
>
>
>
> What do I have to do to override my params with yarn and flink ?
>
>
>
>
>
> Best regards,
>
> Franck Cussac.
>
>
>
>
>