Avoiding deadlock with iterations

2018-01-24 Thread Ken Krugler
Hi all,

We’ve run into deadlocks with two different streaming workflows that have 
iterations.

In both cases, the issue is with fan-out; if any operation in the loop can emit 
more records than consumed, eventually a network buffer fills up, and then 
everyone in the iteration loop is blocked.

One pattern we can use, when the operator that’s causing the fan-out has the 
ability to decide how much to emit, is to have it behave as an async function, 
emitting from a queue with multiple threads. If threads start blocking because 
of back pressure, then the queue begins to fill up, and the function can 
throttle back how much data it queues up. So this gives us a small (carefully 
managed) data reservoir we can use to avoid the deadlock.

Is there a better approach? I didn’t see any way to determine how “full” the 
various network buffers are, and use that for throttling. Plus there’s the 
issue of partitioning, where it would be impossible in many cases to know the 
impact of a record being emitted. So even if we could monitor buffers, I don’t 
think it’s a viable solution.

Thanks,

— Ken


http://about.me/kkrugler 
+1 530-210-6378



Re: Understanding Restart Strategy

2018-01-24 Thread Ashish Pokharel
FYI,

I think I have gotten to the bottom this situation. For anyone who might be in 
situation hopefully my observations will help.

In my case, it had nothing to do with Flink Restart Strategy, it was doing it’s 
thing as expected. Issue really was, Kafka Producer timeout counters. As I 
mentioned in other thread, we have a capacity issue with our Kafka cluster that 
ends up causing some timeout in our Flink Applications (we do have throttle in 
place in Kafka to manage it better but still we run into timeout pretty often 
right unfortunately). 

We had set our Kafka Producer retries to 10. It seems like that retry counter 
never gets reset. So over life of an App if it hits 10 timeouts, it basically 
couldn’t start and went to a Failed state. I am yet to dig into whether this 
can be solved from Flink Kafka wrapper or not. But, for now we have set the 
retries to 0 and hopefully this situation will not happen.

If anyone has any similar observations pl feel free to share.

Thanks, Ashish

> On Jan 19, 2018, at 2:43 PM, ashish pok  wrote:
> 
> Team,
> 
> Hopefully, this is a quick one. 
> 
> We have setup restart strategy as follows in pretty much all of our apps:
> 
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 
> Time.of(30, TimeUnit.SECONDS)));
> 
> This seems pretty straight-forward. App should retry starting 10 times every 
> 30 seconds - so about 5 minutes. Either we are not understanding this or it 
> seems inconsistent. Some of the applications restart and come back fine on 
> issues like Kafka timeout (which I will come back to later) but in some cases 
> same issues pretty much shuts the app down. 
> 
> My first guess here was that total count of 10 is not reset after App 
> recovered normally. Is there a need to manually reset the counter in an App? 
> I doubt Flink would be treating it like a counter that spans the life of an 
> App instead of resetting on successful start-up - but not sure how else to 
> explain the behavior.
> 
> Along the same line, what actually constitutes as a "restart"? Our Kafka 
> cluster has known performance bottlenecks during certain times of day that we 
> are working to resolve. I do notice Kafka producer timeouts quite a few times 
> during these times. When App hits these timeouts, it does recover fine but I 
> dont necessary see entire application restarting as I dont see bootstrap logs 
> of my App. Does something like this count as a restart of App from Restart 
> Strategy perspective as well vs things like apps crashes/Yarn killing 
> application etc. where App is actually restarted from scratch?
> 
> We are really liking Flink, just need to hash out these operational issues to 
> make it prime time for all streaming apps we have in our cluster.
> 
> Thanks,
> 
> Ashish



Data exchange between tasks (operators/sources) at streaming api runtime

2018-01-24 Thread Ishwara Varnasi
Hello,
I have a scenario where I've two sources, one of them is source of fixed
list of ids for preloading (caching certain info which is slow) and second
one is the kafka consumer. I need to run Kafka after first one completes. I
need a mechanism to let the Kafka consumer know that it can start consuming
messages. How can I achieve this?
thanks
Ishwara Varnasi


Re: Kafka Producer timeout causing data loss

2018-01-24 Thread Ashish Pokharel
Fabian,

Thanks for your feedback - very helpful as usual !

This is sort of becoming a huge problem for us right now because of our Kafka 
situation. For some reason I missed this detail going through the docs. We are 
definitely seeing heavy dose of data loss when Kafka timeouts are happening. 

We actually have 1.4 version - I’d be interested to understand if anything can 
be done in 1.4 to prevent this scenario.

One other thought I had was an ability to invoke “Checkpointing before Restart 
/ Recovery” -> meaning I don’t necessarily need to checkpoint periodically but 
I do want to make sure on a explicit restart / rescheduling like this, we do 
have a decent “last known” state. Not sure if this is currently doable.

Thanks, Ashish

> On Jan 23, 2018, at 5:03 AM, Fabian Hueske  wrote:
> 
> Hi Ashish,
> 
> Originally, Flink always performed full recovery in case of a failure, i.e., 
> it restarted the complete application.
> There is some ongoing work to improve this and make recovery more 
> fine-grained (FLIP-1 [1]). 
> Some parts have been added for 1.3.0.
> 
> I'm not familiar with the details, but Stefan (in CC) should be able to 
> answer your specific question.
> 
> Best, Fabian
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
>  
> 
> 
> 2018-01-19 20:59 GMT+01:00 ashish pok  >:
> Team,
> 
> One more question to the community regarding hardening Flink Apps.
> 
> Let me start off by saying we do have known Kafka bottlenecks which we are in 
> the midst of resolving. So during certain times of day, a lot of our Flink 
> Apps are seeing Kafka Producer timeout issues. Most of the logs are some 
> flavor of this:
> 
> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for 
> dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at org.apache.flink.streaming.runtime.io 
> .StreamInputProcessor.processInput(StreamInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 
> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation 
> plus linger time
> 
> Timeouts are not necessarily good but I am sure we understand this is bound 
> to happen (hopefully lesser). 
> 
> The issue for us however is it almost looks like Flink is stopping and 
> restarting all operators (a lot of other operators including Map, Reduce and 
> Process functions if not all) along with Kafka Producers. We are processing 
> pretty substantial load in Flink and dont really intend to enable Rocks/HDFS 
> checkpointing in some of these Apps - we are ok to sustain so

Re: Task Manager detached under load

2018-01-24 Thread Ashish Pokharel
I haven’t gotten much further with this. It doesn’t look like GC related - at 
least GC counters were not that atrocious. However, my main concern was once 
the load subsides why aren’t TM and JM connecting again? That doesn’t look 
normal. I could definitely tell JM was listening on the port and from logs it 
does appear TM is trying to message JM that is still alive. 

Thanks, Ashish

> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard  
> wrote:
> 
> Hi. 
> 
> Did you find a reason for the detaching ?
> I sometimes see the same on our system running Flink 1.4 on dc/os. I have 
> enabled taskmanager.Debug.memory.startlogthread for debugging. 
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 
> 
> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong  >:
> 
>> Hi,
>> 
>> You should enable and check your garbage collection log.
>> 
>> We've encountered case where Task Manager disassociated due to long GC pause.
>> 
>> Regards,
>> 
>> Kien
>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>> Hi All,
>>> 
>>> We have hit some load related issues and was wondering if any one has some 
>>> suggestions. We are noticing task managers and job managers being detached 
>>> from each other under load and never really sync up again. As a result, 
>>> Flink session shows 0 slots available for processing. Even though, apps are 
>>> configured to restart it isn't really helping as there are no slots 
>>> available to run the apps.
>>> 
>>> 
>>> Here are excerpt from logs that seemed relevant. (I am trimming out rest of 
>>> the logs for brevity)
>>> 
>>> Job Manager:
>>> 2018-01-19 12:38:00,423 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager-  Starting 
>>> JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>> 
>>> 2018-01-19 12:38:00,792 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager-  Maximum 
>>> heap size: 16384 MiBytes
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager-  Hadoop 
>>> version: 2.6.5
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager-  JVM 
>>> Options:
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- 
>>> -Xms16384m
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- 
>>> -Xmx16384m
>>> 2018-01-19 12:38:00,795 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- 
>>> -XX:+UseG1GC
>>> 
>>> 2018-01-19 12:38:00,908 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2018-01-19 12:38:00,908 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>>> configuration property: jobmanager.heap.mb, 16384
>>> 
>>> 
>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher 
>>> - Detected unreachable: [akka.tcp://flink@:37840]
>>> 2018-01-19 12:53:34,676 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- Task 
>>> manager akka.tcp://flink@:37840/user/taskmanager terminated.
>>> 
>>> -- So once Flink session boots up, we are hitting it with pretty heavy 
>>> load, which typically results in the WARN above
>>> 
>>> Task Manager:
>>> 2018-01-19 12:38:01,002 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
>>> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
>>> version: 2.6.5
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM 
>>> Options:
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - 
>>> -Xms16384M
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - 
>>> -Xmx16384M
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - 
>>> -XX:MaxDirectMemorySize=8388607T
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - 
>>> -XX:+UseG1GC
>>> 
>>> 2018-01-19 12:38:01,392 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2018-01-19 12:38:01,392 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration- Loading 
>>> configuration property: jobmanager.heap.mb, 16384
>>> 
>>> 
>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher 
>>> - Detected unreachable: [akka.tcp://flink@:6123]
>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting  
>>> - Quarantined address [akka.tcp://flink@:6123

Re: some question about isChainable

2018-01-24 Thread Marvin777
Thank you for your reply. I've learned.

Note that, with a local debug of Flink 1.3.1, i found that restrictions
have been added in addEdgeInternal(), like this:
[image: 内嵌图片 1]


Best regard.

2018-01-25 1:17 GMT+08:00 Aljoscha Krettek :

> I think this might just be overly cautious because it's not incorrect to
> check these. Also, in fact I think you could have "forward" with differing
> parallelism in some earlier versions which would lead to some unexpected
> results. Not sure if that is still possible now.
>
> > On 24. Jan 2018, at 09:50, Marvin777  wrote:
> >
> > Hi all:
> >
> > We all know that the conditions for chaining are 9, like this:
> > 
> > However, I have a question about 2, 3, 8 in the figure above.
> > As far as I know, the operator cannot be null, and If the partitioner of
> the edge is forward, then their parallelism must be the same. Maybe I'm
> completely missing something...
> >
> > Waiting for your reply.
> >
> > Best regard.
>
>


Problems caused by use of hadoop classpath bash command

2018-01-24 Thread Ken Krugler
Hi all,

With Flink 1.4 and FLINK-7477 
, I ran into a problem with 
jar versions for HttpCore, when using the AWS SDK to read from S3.

I believe the issue is that even when setting classloader.resolve-order to 
child-first in flink-conf.yaml, the change to put all jars returned by “hadoop 
classpath” on the classpath means that classes in these jars are found before 
the classes in my shaded Flink uber jar.

If I ensure that I don’t have the “hadoop” command set up on my Bash path, then 
I don’t run into this issue.

Does this make sense, or is there something else going on that I can fix to 
avoid this situation?

Regards,

— Ken


http://about.me/kkrugler
+1 530-210-6378



Send ACK when all records of file are processed

2018-01-24 Thread Vinay Patil
Hi Guys,

Following is how my pipeline looks (DataStream API) :

[1] Read the data from the csv file
[2] KeyBy it by some id
[3] Do the enrichment and write it to DB

[1] reads the data in sequence as it has single parallelism and then I have
default parallelism for the other operators.

I want to generate a response (ack) when all the data of the file is
processed. How can I achieve this ?

One solution I can think of is to have EOF dummy record in a file and a
unique field for all the records in that file. Doing a keyBy on this field
will make sure that all records are sent to a single slot. So, when EOF
dummy records is read I can generate a response/ack.

Is there a better way I can deal with this ?


Regards,
Vinay Patil


Re: Does web ui hang for 1.4 for job submission ?

2018-01-24 Thread Vishal Santoshi
Exactly, the same context and issue. We too depend on args too and server
side  validations and that on error ui hangs.

On Wed, Jan 24, 2018 at 1:13 PM, Lasse Nedergaard  wrote:

> Hi.
>
> Our jobs take a number of command line options and validate them during
> startup and throw an exception if something is wrong. When this exception
> happens ui hang all prev. uploaded jars are removed most task manager are
> lost and all running jobs terminated. We running Flink 1.4 on dc/os
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 24. jan. 2018 kl. 18.51 skrev Vishal Santoshi <
> vishal.santo...@gmail.com>:
>
> Yep, got it. It seems that the response when  in  an error state is
> reported by JM is not being rendered.
>
>  We can see the response in the UI debugger but not on the UI itself.
>
> On Wed, Jan 24, 2018 at 12:28 PM, Ufuk Celebi  wrote:
>
>> It's not a stupid question at all! Try the following please:
>> 1) Use something like Chrome's Developer Tools to check the responses
>> you get from the web UI. If you see an error there, that should point
>> you to what's going on.
>> 2) Enable DEBUG logging for the JobManager and check the logs (if 1
>> doesn't help)
>>
>> We just merged a change into master that does a better job of
>> reporting any errors that occur during submission.
>>
>> – Ufuk
>>
>>
>> On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
>>  wrote:
>> > We have had 2 different installations and when e go submit a job, it
>> gives
>> > us the spinning wheel and the request never reaches the JM. CLI works
>> > without issues. Is this us alone or some one else seen this.
>> >
>> > Again 1.3.2 does not have the issue and we have the same nodes, n/w etc
>> so
>> > we sure nothing should have changed on our side.
>> >
>> > Apologies if this is a stupid question with an obvious solution.
>> >
>> > Regards
>> >
>> > Vishal
>>
>
>


Re: Does web ui hang for 1.4 for job submission ?

2018-01-24 Thread Lasse Nedergaard
Hi. 

Our jobs take a number of command line options and validate them during startup 
and throw an exception if something is wrong. When this exception happens ui 
hang all prev. uploaded jars are removed most task manager are lost and all 
running jobs terminated. We running Flink 1.4 on dc/os

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 24. jan. 2018 kl. 18.51 skrev Vishal Santoshi :
> 
> Yep, got it. It seems that the response when  in  an error state is reported 
> by JM is not being rendered.
> 
>  We can see the response in the UI debugger but not on the UI itself.
> 
>> On Wed, Jan 24, 2018 at 12:28 PM, Ufuk Celebi  wrote:
>> It's not a stupid question at all! Try the following please:
>> 1) Use something like Chrome's Developer Tools to check the responses
>> you get from the web UI. If you see an error there, that should point
>> you to what's going on.
>> 2) Enable DEBUG logging for the JobManager and check the logs (if 1
>> doesn't help)
>> 
>> We just merged a change into master that does a better job of
>> reporting any errors that occur during submission.
>> 
>> – Ufuk
>> 
>> 
>> On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
>>  wrote:
>> > We have had 2 different installations and when e go submit a job, it gives
>> > us the spinning wheel and the request never reaches the JM. CLI works
>> > without issues. Is this us alone or some one else seen this.
>> >
>> > Again 1.3.2 does not have the issue and we have the same nodes, n/w etc so
>> > we sure nothing should have changed on our side.
>> >
>> > Apologies if this is a stupid question with an obvious solution.
>> >
>> > Regards
>> >
>> > Vishal
> 


Re: Does web ui hang for 1.4 for job submission ?

2018-01-24 Thread Vishal Santoshi
The error is coming out as plain test with status 500.

On Wed, Jan 24, 2018 at 12:51 PM, Vishal Santoshi  wrote:

> Yep, got it. It seems that the response when  in  an error state is
> reported by JM is not being rendered.
>
>  We can see the response in the UI debugger but not on the UI itself.
>
> On Wed, Jan 24, 2018 at 12:28 PM, Ufuk Celebi  wrote:
>
>> It's not a stupid question at all! Try the following please:
>> 1) Use something like Chrome's Developer Tools to check the responses
>> you get from the web UI. If you see an error there, that should point
>> you to what's going on.
>> 2) Enable DEBUG logging for the JobManager and check the logs (if 1
>> doesn't help)
>>
>> We just merged a change into master that does a better job of
>> reporting any errors that occur during submission.
>>
>> – Ufuk
>>
>>
>> On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
>>  wrote:
>> > We have had 2 different installations and when e go submit a job, it
>> gives
>> > us the spinning wheel and the request never reaches the JM. CLI works
>> > without issues. Is this us alone or some one else seen this.
>> >
>> > Again 1.3.2 does not have the issue and we have the same nodes, n/w etc
>> so
>> > we sure nothing should have changed on our side.
>> >
>> > Apologies if this is a stupid question with an obvious solution.
>> >
>> > Regards
>> >
>> > Vishal
>>
>
>


Re: Does web ui hang for 1.4 for job submission ?

2018-01-24 Thread Vishal Santoshi
Yep, got it. It seems that the response when  in  an error state is
reported by JM is not being rendered.

 We can see the response in the UI debugger but not on the UI itself.

On Wed, Jan 24, 2018 at 12:28 PM, Ufuk Celebi  wrote:

> It's not a stupid question at all! Try the following please:
> 1) Use something like Chrome's Developer Tools to check the responses
> you get from the web UI. If you see an error there, that should point
> you to what's going on.
> 2) Enable DEBUG logging for the JobManager and check the logs (if 1
> doesn't help)
>
> We just merged a change into master that does a better job of
> reporting any errors that occur during submission.
>
> – Ufuk
>
>
> On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
>  wrote:
> > We have had 2 different installations and when e go submit a job, it
> gives
> > us the spinning wheel and the request never reaches the JM. CLI works
> > without issues. Is this us alone or some one else seen this.
> >
> > Again 1.3.2 does not have the issue and we have the same nodes, n/w etc
> so
> > we sure nothing should have changed on our side.
> >
> > Apologies if this is a stupid question with an obvious solution.
> >
> > Regards
> >
> > Vishal
>


Re: Does web ui hang for 1.4 for job submission ?

2018-01-24 Thread Ufuk Celebi
It's not a stupid question at all! Try the following please:
1) Use something like Chrome's Developer Tools to check the responses
you get from the web UI. If you see an error there, that should point
you to what's going on.
2) Enable DEBUG logging for the JobManager and check the logs (if 1
doesn't help)

We just merged a change into master that does a better job of
reporting any errors that occur during submission.

– Ufuk


On Wed, Jan 24, 2018 at 5:24 PM, Vishal Santoshi
 wrote:
> We have had 2 different installations and when e go submit a job, it gives
> us the spinning wheel and the request never reaches the JM. CLI works
> without issues. Is this us alone or some one else seen this.
>
> Again 1.3.2 does not have the issue and we have the same nodes, n/w etc so
> we sure nothing should have changed on our side.
>
> Apologies if this is a stupid question with an obvious solution.
>
> Regards
>
> Vishal


Re: some question about isChainable

2018-01-24 Thread Aljoscha Krettek
I think this might just be overly cautious because it's not incorrect to check 
these. Also, in fact I think you could have "forward" with differing 
parallelism in some earlier versions which would lead to some unexpected 
results. Not sure if that is still possible now.

> On 24. Jan 2018, at 09:50, Marvin777  wrote:
> 
> Hi all:
> 
> We all know that the conditions for chaining are 9, like this:
> 
> However, I have a question about 2, 3, 8 in the figure above. 
> As far as I know, the operator cannot be null, and If the partitioner of the 
> edge is forward, then their parallelism must be the same. Maybe I'm 
> completely missing something...
> 
> Waiting for your reply.
> 
> Best regard.



Re: Flink CEP exception during RocksDB update

2018-01-24 Thread Kostas Kloudas
Hi Varun,

Thanks for taking time to look into it. Could you give a sample input and your 
pattern to reproduce the problem?
That would help a lot at figuring out the cause of the problem.

Thanks,
Kostas

> On Jan 23, 2018, at 5:40 PM, Varun Dhore  wrote:
> 
> Hi Kostas,
> 
> I was able to reproduce the error with 1.4.0. After upgrading the cluster to 
> 1.5 snapshot and running through the same data I am still experiencing the 
> same exception. CEP patterns that I am running are using followed by patterns 
> e.g AfBfC. From my experience I was never able to get stable execution when 
> checkpoints are enabled. When I disable checkpoints CEP jobs are running 
> fine. Aside from this particular error I also notice that majority of 
> checkpoints expire as the do not complete within configured 5 min timeout 
> period. Any suggestions on further debugging runtime checkpoints would be 
> very helpful. 
> Thanks in advance for your assistance.
> 
> Regards,
> Varun 
> 
> On Jan 18, 2018, at 8:11 AM, Kostas Kloudas  > wrote:
> 
>> Thanks a lot Varun!
>> 
>> Kostas
>> 
>>> On Jan 17, 2018, at 9:59 PM, Varun Dhore >> > wrote:
>>> 
>>> Thank you Kostas. Since this error is not easily reproducible on my end 
>>> I’ll continue testing this and confirm the resolution once I am able to do 
>>> so.
>>> 
>>> Thanks,
>>> Varun 
>>> 
>>> Sent from my iPhone
>>> 
>>> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas >> > wrote:
>>> 
 Hi Varun,
 
 This can be related to this issue: 
 https://issues.apache.org/jira/browse/FLINK-8226 
 
 which is currently fixed on the master.
 
 Could you please try the current master to see if the error persists?
 
 Thanks,
 Kostas
 
> On Jan 15, 2018, at 4:09 PM, Varun Dhore  > wrote:
> 
> 
> 
>> Hello Flink community,
>>  
>> I have encountered following exception while testing 1.4.0 release. This 
>> error is occurring intermittently and my CEP job keeps restarting after 
>> this exception. I am running the job with Event time semantics and 
>> checkpoints enabled.
>>  
>>  
>> java.lang.RuntimeException: Exception occurred while 
>> processing valve output watermark:
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>> at 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error while adding 
>> data to RocksDB
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
>> at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>> at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
>> at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>> at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>> ... 7 more
>> Caused by: java.lang.IllegalStateException: Could not find 
>> id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, 
>> timestamp: 1515593398897), 1515593398897, 0), [SharedBuf

Does web ui hang for 1.4 for job submission ?

2018-01-24 Thread Vishal Santoshi
We have had 2 different installations and when e go submit a job, it gives
us the spinning wheel and the request never reaches the JM. CLI works
without issues. Is this us alone or some one else seen this.

Again 1.3.2 does not have the issue and we have the same nodes, n/w etc so
we sure nothing should have changed on our side.

Apologies if this is a stupid question with an obvious solution.

Regards

Vishal


S3 for state backend in Flink 1.4.0

2018-01-24 Thread Marchant, Hayden
Hi,

We have a Flink Streaming application that uses S3 for storing checkpoints. We 
are not using 'regular' S3, but rather IBM Object Storage which has an 
S3-compatible connector. We had quite some challenges in overiding the endpoint 
from the default s3.amnazonaws.com to our internal IBM Object Storage endpoint. 
In 1.3.2, we managed to get this working by providing our own jets3t.properties 
file that overrode s3service.s3-endpoint 
(https://jets3t.s3.amazonaws.com/toolkit/configuration.html)

When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop 
artifact. Seems that our overriding with jets3t.properties is no longer 
relevant since does not use the Hadoop implementation anymore. 

Is there a way to overide this default endpoint, or with the presto endpoint 
can we use this? Please note that if we provide the endpoint in the URL for the 
state backend, it simply appends s3.amazonaws.com to the url. For example 
s3://myobjectstorageendpoint.s3.amazonaws.com.

Are there any other solutions such as to 'rollback' to the Hadoop 
implementation of S3?

Thanks,
Hayden


Re: Should multiple apache flink task managers have strong identity? Also, should I point their state.checkpoints.dir to the same HDFS?

2018-01-24 Thread Felipe Cavalcanti
Thanks for the answer Patrick!

Awesome! I've made a deployment the following way:

Jobmanager is a statefulset with size 1, I'm using EFS(as a persistent
volume) and zookeeper for persisting it's state across reboots, the
reason it's a statefulset is for having a stable dns for pointing to
it "jobmanager-0.jobmanager"
TaskManagers are deployed as a Deployment with EFS for persisting
checkpoints and snapshots.

It's working great so far.

Regards

2018-01-24 13:10 GMT-02:00 Patrick Lucas :
> Hi Felipe,
>
> No, using a Deployment for taskmanagers should be fine—they don't need a
> strong identity. For intracluster communication, the taskmanager's hostname
> is used by default, which in most Kubernetes setups is resolvable to the Pod
> IP.
>
> state.checkpoints.dir should be configured the same for all jobmanagers and
> taskmanagers. All members of the cluster need to be able to see all the
> data, and Flink ensures that things are written to the correct subpath.
>
> --
> Patrick Lucas
>
> On Mon, Jan 22, 2018 at 7:23 PM, Felipe Cavalcanti 
> wrote:
>>
>> Hi,
>>
>> I'm deploying flink to kubernetes and I've some doubts...
>>
>> First one is if the task managers should have strong identity (in
>> which case I will use statefulsets for deploying them). Second one is
>> if I should point rocksdb state.checkpoint.dir in all task managers to
>> the same HDFS path or if each of them should point to their own...
>>
>> Thanks!
>
>


Re: Should multiple apache flink task managers have strong identity? Also, should I point their state.checkpoints.dir to the same HDFS?

2018-01-24 Thread Patrick Lucas
Hi Felipe,

No, using a Deployment for taskmanagers should be fine—they don't need a
strong identity. For intracluster communication, the taskmanager's hostname
is used by default, which in most Kubernetes setups is resolvable to the
Pod IP.

state.checkpoints.dir should be configured the same for all jobmanagers and
taskmanagers. All members of the cluster need to be able to see all the
data, and Flink ensures that things are written to the correct subpath.

--
Patrick Lucas

On Mon, Jan 22, 2018 at 7:23 PM, Felipe Cavalcanti 
wrote:

> Hi,
>
> I'm deploying flink to kubernetes and I've some doubts...
>
> First one is if the task managers should have strong identity (in
> which case I will use statefulsets for deploying them). Second one is
> if I should point rocksdb state.checkpoint.dir in all task managers to
> the same HDFS path or if each of them should point to their own...
>
> Thanks!
>


Re: Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-24 Thread Edward
Thanks, Stephan. You're correct that there was in fact no shading issue in
the official flink-dist_2.11-1.4.0.jar. We are using the jar in the flink
docker image, but I mis-spoke when I said ObjectMapper appeared there
unshaded. It turned out the issue was really a version conflict in our job's
uber-jar file between jackson-core and jackson-databind, which I was able to
resolve.

Regarding the Verify Error, please post back here when you have a patch or a
link to a pull request which we can track.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Failing to recover once checkpoint fails

2018-01-24 Thread Aljoscha Krettek
Did you see my second mail?

> On 24. Jan 2018, at 12:50, Vishal Santoshi  wrote:
> 
> As in, if there are chk handles in zk, there should no reason to start a new 
> job ( bad handle, no hdfs connectivity etc ),
>  yes that sums it up.
> 
> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek  > wrote:
> Wait a sec, I just checked out the code again and it seems we already do 
> that: 
> https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
>  
> 
> 
> If there were some checkpoints but none could be read we fail recovery.
> 
> 
>> On 24. Jan 2018, at 11:32, Aljoscha Krettek > > wrote:
>> 
>> That sounds reasonable: We would keep the first fix, i.e. never delete 
>> checkpoints if they're "corrupt", only when they're subsumed. Additionally, 
>> we fail the job if there are some checkpoints in ZooKeeper but none of them 
>> can be restored to prevent the case where a job starts from scratch even 
>> though it shouldn't.
>> 
>> Does that sum it up?
>> 
>>> On 24. Jan 2018, at 01:19, Vishal Santoshi >> > wrote:
>>> 
>>> If we hit the retry limit, abort the job. In our case we will restart from 
>>> the last SP ( we as any production pile do it is n time s a day )  and that 
>>> I would think should be OK for most folks ?
>>> 
>>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi 
>>> mailto:vishal.santo...@gmail.com>> wrote:
>>> Thank you for considering this. If I understand you correctly.
>>> 
>>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>>> * Some issue restarted the pipeline.
>>> * The NN was down unfortunately and flink could not retrieve the  CHK state 
>>> from the CHK pointer on ZK.
>>> 
>>> Before 
>>> 
>>> * The CHK pointer was being removed and the job started from a brand new 
>>> slate.
>>> 
>>> After ( this fix on 1.4 +) 
>>> 
>>> * do not delete the CHK pointer ( It has to be subsumed to be deleted ). 
>>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any 
>>> retry limit ) to restore state
>>> * NN comes back 
>>> * Flink restores state on the next retry.
>>> 
>>> I would hope that is the sequence to follow.
>>> 
>>> Regards.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek >> > wrote:
>>> Hi Vishal,
>>> 
>>> I think you might be right. We fixed the problem that checkpoints where 
>>> dropped via https://issues.apache.org/jira/browse/FLINK-7783 
>>> . However, we still have 
>>> the problem that if the DFS is not up at all then it will look as if the 
>>> job is starting from scratch. However, the alternative is failing the job, 
>>> in which case you will also never be able to restore from a checkpoint. 
>>> What do you think?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> 
 On 23. Jan 2018, at 10:15, Fabian Hueske >>> > wrote:
 
 Sorry for the late reply.
 
 I created FLINK-8487 [1] to track this problem
 
 @Vishal, can you have a look and check if if forgot some details? I logged 
 the issue for Flink 1.3.2, is that correct?
 Please add more information if you think it is relevant.
 
 Thanks,
 Fabian
 
 [1] https://issues.apache.org/jira/browse/FLINK-8487 
 
 
 2018-01-18 22:14 GMT+01:00 Vishal Santoshi >>> >:
 Or this one 
 
 https://issues.apache.org/jira/browse/FLINK-4815 
 
 
 On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi 
 mailto:vishal.santo...@gmail.com>> wrote:
 ping. 
 
 This happened again on production and it seems reasonable to abort 
 when a checkpoint is not found rather than behave as if it is a brand new 
 pipeline.  
 
 On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi 
 mailto:vishal.santo...@gmail.com>> wrote:
 Folks sorry for being late on this. Can some body with the knowledge of 
 this code base create a jira issue for the above ? We have seen this more 
 than once on production.
 
 On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek >>> > wrote:
 Hi Vishal,
 
 Some relevant Jira issues for you are:
 
  - https://issues.apache.org/jira/browse/FLINK-4808: 
  Allow skipping failed 
 checkpoints
  - https://issues.apache.org/jira/browse/FLINK-4815: 
 

Re: Failing to recover once checkpoint fails

2018-01-24 Thread Vishal Santoshi
As in, if there are chk handles in zk, there should no reason to start a
new job ( bad handle, no hdfs connectivity etc ),
 yes that sums it up.

On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek 
wrote:

> Wait a sec, I just checked out the code again and it seems we already do
> that: https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bdd
> c0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/
> runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
>
> If there were some checkpoints but none could be read we fail recovery.
>
>
> On 24. Jan 2018, at 11:32, Aljoscha Krettek  wrote:
>
> That sounds reasonable: We would keep the first fix, i.e. never delete
> checkpoints if they're "corrupt", only when they're subsumed. Additionally,
> we fail the job if there are some checkpoints in ZooKeeper but none of them
> can be restored to prevent the case where a job starts from scratch even
> though it shouldn't.
>
> Does that sum it up?
>
> On 24. Jan 2018, at 01:19, Vishal Santoshi 
> wrote:
>
> If we hit the retry limit, abort the job. In our case we will restart from
> the last SP ( we as any production pile do it is n time s a day )  and that
> I would think should be OK for most folks ?
>
> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Thank you for considering this. If I understand you correctly.
>>
>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>> * Some issue restarted the pipeline.
>> * The NN was down unfortunately and flink could not retrieve the  CHK
>> state from the CHK pointer on ZK.
>>
>> Before
>>
>> * The CHK pointer was being removed and the job started from a brand new
>> slate.
>>
>> After ( this fix on 1.4 +)
>>
>> * do not delete the CHK pointer ( It has to be subsumed to be deleted ).
>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit
>> any retry limit ) to restore state
>> * NN comes back
>> * Flink restores state on the next retry.
>>
>> I would hope that is the sequence to follow.
>>
>> Regards.
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> I think you might be right. We fixed the problem that checkpoints where
>>> dropped via https://issues.apache.org/jira/browse/FLINK-7783. However,
>>> we still have the problem that if the DFS is not up at all then it will
>>> look as if the job is starting from scratch. However, the alternative is
>>> failing the job, in which case you will also never be able to restore from
>>> a checkpoint. What do you think?
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 23. Jan 2018, at 10:15, Fabian Hueske  wrote:
>>>
>>> Sorry for the late reply.
>>>
>>> I created FLINK-8487 [1] to track this problem
>>>
>>> @Vishal, can you have a look and check if if forgot some details? I
>>> logged the issue for Flink 1.3.2, is that correct?
>>> Please add more information if you think it is relevant.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-8487
>>>
>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi :
>>>
 Or this one

 https://issues.apache.org/jira/browse/FLINK-4815

 On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> ping.
>
> This happened again on production and it seems reasonable to abort
> when a checkpoint is not found rather than behave as if it is a brand new
> pipeline.
>
> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Folks sorry for being late on this. Can some body with the knowledge
>> of this code base create a jira issue for the above ? We have seen this
>> more than once on production.
>>
>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <
>> aljos...@apache.org> wrote:
>>
>>> Hi Vishal,
>>>
>>> Some relevant Jira issues for you are:
>>>
>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>> failed checkpoints
>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>> fallback to earlier checkpoint when checkpoint restore fails
>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 9. Oct 2017, at 09:06, Fabian Hueske  wrote:
>>>
>>> Hi Vishal,
>>>
>>> it would be great if you could create a JIRA ticket with Blocker
>>> priority.
>>> Please add all relevant information of your detailed analysis, add a
>>> link to this email thread (see [1] for the web archive of the mailing
>>> list), and post the id of the JIRA issue here.
>>>
>>> Thanks for looking into this!
>>>
>>> Best regards,
>>> Fabian
>>>
>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>
>>> 201

Re: Scaling Flink

2018-01-24 Thread Hung
What about scaling up with #task slots left?
You can obtain this information from Flink's endpoint.

Cheers,

Sendoh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Failing to recover once checkpoint fails

2018-01-24 Thread Aljoscha Krettek
Wait a sec, I just checked out the code again and it seems we already do that: 
https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
 


If there were some checkpoints but none could be read we fail recovery.

> On 24. Jan 2018, at 11:32, Aljoscha Krettek  wrote:
> 
> That sounds reasonable: We would keep the first fix, i.e. never delete 
> checkpoints if they're "corrupt", only when they're subsumed. Additionally, 
> we fail the job if there are some checkpoints in ZooKeeper but none of them 
> can be restored to prevent the case where a job starts from scratch even 
> though it shouldn't.
> 
> Does that sum it up?
> 
>> On 24. Jan 2018, at 01:19, Vishal Santoshi > > wrote:
>> 
>> If we hit the retry limit, abort the job. In our case we will restart from 
>> the last SP ( we as any production pile do it is n time s a day )  and that 
>> I would think should be OK for most folks ?
>> 
>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi > > wrote:
>> Thank you for considering this. If I understand you correctly.
>> 
>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>> * Some issue restarted the pipeline.
>> * The NN was down unfortunately and flink could not retrieve the  CHK state 
>> from the CHK pointer on ZK.
>> 
>> Before 
>> 
>> * The CHK pointer was being removed and the job started from a brand new 
>> slate.
>> 
>> After ( this fix on 1.4 +) 
>> 
>> * do not delete the CHK pointer ( It has to be subsumed to be deleted ). 
>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any 
>> retry limit ) to restore state
>> * NN comes back 
>> * Flink restores state on the next retry.
>> 
>> I would hope that is the sequence to follow.
>> 
>> Regards.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek > > wrote:
>> Hi Vishal,
>> 
>> I think you might be right. We fixed the problem that checkpoints where 
>> dropped via https://issues.apache.org/jira/browse/FLINK-7783 
>> . However, we still have 
>> the problem that if the DFS is not up at all then it will look as if the job 
>> is starting from scratch. However, the alternative is failing the job, in 
>> which case you will also never be able to restore from a checkpoint. What do 
>> you think?
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 23. Jan 2018, at 10:15, Fabian Hueske >> > wrote:
>>> 
>>> Sorry for the late reply.
>>> 
>>> I created FLINK-8487 [1] to track this problem
>>> 
>>> @Vishal, can you have a look and check if if forgot some details? I logged 
>>> the issue for Flink 1.3.2, is that correct?
>>> Please add more information if you think it is relevant.
>>> 
>>> Thanks,
>>> Fabian
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-8487 
>>> 
>>> 
>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi >> >:
>>> Or this one 
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-4815 
>>> 
>>> 
>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi >> > wrote:
>>> ping. 
>>> 
>>> This happened again on production and it seems reasonable to abort when 
>>> a checkpoint is not found rather than behave as if it is a brand new 
>>> pipeline.  
>>> 
>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi >> > wrote:
>>> Folks sorry for being late on this. Can some body with the knowledge of 
>>> this code base create a jira issue for the above ? We have seen this more 
>>> than once on production.
>>> 
>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek >> > wrote:
>>> Hi Vishal,
>>> 
>>> Some relevant Jira issues for you are:
>>> 
>>>  - https://issues.apache.org/jira/browse/FLINK-4808: 
>>>  Allow skipping failed 
>>> checkpoints
>>>  - https://issues.apache.org/jira/browse/FLINK-4815: 
>>>  Automatic fallback to 
>>> earlier checkpoint when checkpoint restore fails
>>>  - https://issues.apache.org/jira/browse/FLINK-7783: 
>>>  Don't always remove 
>>> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> 
 On 9. Oct 2017, at 09:06, Fabian Hueske >>> > wrote:
 
 Hi Vishal,
 
 it would be great if you could create a JIRA ticket with Bloc

Re: Failing to recover once checkpoint fails

2018-01-24 Thread Aljoscha Krettek
That sounds reasonable: We would keep the first fix, i.e. never delete 
checkpoints if they're "corrupt", only when they're subsumed. Additionally, we 
fail the job if there are some checkpoints in ZooKeeper but none of them can be 
restored to prevent the case where a job starts from scratch even though it 
shouldn't.

Does that sum it up?

> On 24. Jan 2018, at 01:19, Vishal Santoshi  wrote:
> 
> If we hit the retry limit, abort the job. In our case we will restart from 
> the last SP ( we as any production pile do it is n time s a day )  and that I 
> would think should be OK for most folks ?
> 
> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi  > wrote:
> Thank you for considering this. If I understand you correctly.
> 
> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
> * Some issue restarted the pipeline.
> * The NN was down unfortunately and flink could not retrieve the  CHK state 
> from the CHK pointer on ZK.
> 
> Before 
> 
> * The CHK pointer was being removed and the job started from a brand new 
> slate.
> 
> After ( this fix on 1.4 +) 
> 
> * do not delete the CHK pointer ( It has to be subsumed to be deleted ). 
> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any 
> retry limit ) to restore state
> * NN comes back 
> * Flink restores state on the next retry.
> 
> I would hope that is the sequence to follow.
> 
> Regards.
> 
> 
> 
> 
> 
> 
> 
> 
> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek  > wrote:
> Hi Vishal,
> 
> I think you might be right. We fixed the problem that checkpoints where 
> dropped via https://issues.apache.org/jira/browse/FLINK-7783 
> . However, we still have 
> the problem that if the DFS is not up at all then it will look as if the job 
> is starting from scratch. However, the alternative is failing the job, in 
> which case you will also never be able to restore from a checkpoint. What do 
> you think?
> 
> Best,
> Aljoscha
> 
> 
>> On 23. Jan 2018, at 10:15, Fabian Hueske > > wrote:
>> 
>> Sorry for the late reply.
>> 
>> I created FLINK-8487 [1] to track this problem
>> 
>> @Vishal, can you have a look and check if if forgot some details? I logged 
>> the issue for Flink 1.3.2, is that correct?
>> Please add more information if you think it is relevant.
>> 
>> Thanks,
>> Fabian
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-8487 
>> 
>> 
>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi > >:
>> Or this one 
>> 
>> https://issues.apache.org/jira/browse/FLINK-4815 
>> 
>> 
>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi > > wrote:
>> ping. 
>> 
>> This happened again on production and it seems reasonable to abort when 
>> a checkpoint is not found rather than behave as if it is a brand new 
>> pipeline.  
>> 
>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi > > wrote:
>> Folks sorry for being late on this. Can some body with the knowledge of this 
>> code base create a jira issue for the above ? We have seen this more than 
>> once on production.
>> 
>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek > > wrote:
>> Hi Vishal,
>> 
>> Some relevant Jira issues for you are:
>> 
>>  - https://issues.apache.org/jira/browse/FLINK-4808: 
>>  Allow skipping failed 
>> checkpoints
>>  - https://issues.apache.org/jira/browse/FLINK-4815: 
>>  Automatic fallback to 
>> earlier checkpoint when checkpoint restore fails
>>  - https://issues.apache.org/jira/browse/FLINK-7783: 
>>  Don't always remove 
>> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 9. Oct 2017, at 09:06, Fabian Hueske >> > wrote:
>>> 
>>> Hi Vishal,
>>> 
>>> it would be great if you could create a JIRA ticket with Blocker priority.
>>> Please add all relevant information of your detailed analysis, add a link 
>>> to this email thread (see [1] for the web archive of the mailing list), and 
>>> post the id of the JIRA issue here.
>>> 
>>> Thanks for looking into this!
>>> 
>>> Best regards,
>>> Fabian
>>> 
>>> [1] https://lists.apache.org/list.html?user@flink.apache.org 
>>> 
>>> 
>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi >> >:
>>> Thank you for confirming. 
>>>
>>> 
>>>  I think this is a critical bug. In essence any checkpoint store ( 
>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This 
>>> becomes all the more painful with your confirmin

Re: state.checkpoints.dir

2018-01-24 Thread Chesnay Schepler
Something you could try is loading the GlobalConfiguration singleton 
before executing the job and setting

the parameter there.

On 23.01.2018 19:28, Biswajit Das wrote:

Hi Hao ,

Thank you for reply . I was more of trying to find how do I manipulate 
when I run locally from IDE .


~ Biswajit

On Mon, Jan 22, 2018 at 12:56 PM, Hao Sun > wrote:


We generate flink.conf on the fly, so we can use different values
based on environment.

On Mon, Jan 22, 2018 at 12:53 PM Biswajit Das
mailto:biswajit...@gmail.com>> wrote:

Hello ,

 Is there any hack to supply *state.checkpoints.*dir as
argument or JVM parameter when running locally .  I can change
the source *CheckpointCoordinator* and make it work , trying
to find if there is any shortcuts ??

Thank you
~ Biswajit






Re: Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-24 Thread Stephan Ewen
I would assume that once you build Flink with correct shading and add all
the required Jackson dependencies to your application jar, all works fine.

On Jan 24, 2018 10:23, "Stephan Ewen"  wrote:

> Hi!
>
> I have a patch coming up for Verify Error
>
> Concerning the Jackson Error - did you build Flink yourself? It looks like
> flink-dist was not built properly, the shading is incorrect.
> The class "com.fasterxml.jackson.databind.ObjectMapper" should not be in
> the jar in an unshaded manner.
>
> My first guess is you build Flink yourself with Maven version >= 3.3.0
>
> Please see here: https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/start/building.html#dependency-shading
>
> Stephan
>
>
> On Tue, Jan 23, 2018 at 5:21 PM, Edward  wrote:
>
>> Thanks for the follow-up Stephan.
>>
>> I have been running this job from a built jar file which was submitted to
>> an
>> existing Flink 1.4 cluster, not from within the IDE. Interestingly, I am
>> now
>> getting the same error when any of the following 3 conditions are true:
>> 1. I run the job on a local cluster from within my IDE
>> 2. I run the job on a cluster where "classloader.resolve-order:
>> parent-first"
>> 3. I build the uber jar file without including flink-java,
>> flink-streaming-java and flink-clients (I changed those to "provided" as
>> you
>> suggested, so they aren't in my jar)
>>
>> If any of those 3 cases are true, I get a new NoClassDefFoundError. This
>> error is caused because com.fasterxml.jackson.databind.ObjectMapper is
>> present in flink-dist_2.11-1.4.0.jar, but
>> com.fasterxml.jackson.databind.SerializationConfig is not (only the
>> shaded
>> version:
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.
>> databind.SerializationConfig)
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> com.fasterxml.jackson.databind.SerializationConfig
>> at
>> com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:558)
>> at
>> com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:474)
>> at
>> com.mycom.datapipeline.common.client.UmsCientFactory.getUser
>> MappingServiceClient(UmsCientFactory.java:31)
>> at
>> com.mycom.datapipeline.flink.udf.UserLookupFunctionBase.open
>> (UserLookupFunctionBase.java:78)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:393)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:254)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> I understand why this is happening in the case of the parent-first
>> classloader, but I can't understand why it's happening when I exclude
>> flink-java from my job's uber jar file -- in that 2nd case, I would expect
>> the job's child classloader to be used, which contains both of those
>> fasterxml classes.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Error with Avro/Kyro after upgrade to Flink 1.4

2018-01-24 Thread Stephan Ewen
Hi!

I have a patch coming up for Verify Error

Concerning the Jackson Error - did you build Flink yourself? It looks like
flink-dist was not built properly, the shading is incorrect.
The class "com.fasterxml.jackson.databind.ObjectMapper" should not be in
the jar in an unshaded manner.

My first guess is you build Flink yourself with Maven version >= 3.3.0

Please see here:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html#dependency-shading

Stephan


On Tue, Jan 23, 2018 at 5:21 PM, Edward  wrote:

> Thanks for the follow-up Stephan.
>
> I have been running this job from a built jar file which was submitted to
> an
> existing Flink 1.4 cluster, not from within the IDE. Interestingly, I am
> now
> getting the same error when any of the following 3 conditions are true:
> 1. I run the job on a local cluster from within my IDE
> 2. I run the job on a cluster where "classloader.resolve-order:
> parent-first"
> 3. I build the uber jar file without including flink-java,
> flink-streaming-java and flink-clients (I changed those to "provided" as
> you
> suggested, so they aren't in my jar)
>
> If any of those 3 cases are true, I get a new NoClassDefFoundError. This
> error is caused because com.fasterxml.jackson.databind.ObjectMapper is
> present in flink-dist_2.11-1.4.0.jar, but
> com.fasterxml.jackson.databind.SerializationConfig is not (only the shaded
> version:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.
> SerializationConfig)
>
> java.lang.NoClassDefFoundError: Could not initialize class
> com.fasterxml.jackson.databind.SerializationConfig
> at
> com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:558)
> at
> com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:474)
> at
> com.mycom.datapipeline.common.client.UmsCientFactory.
> getUserMappingServiceClient(UmsCientFactory.java:31)
> at
> com.mycom.datapipeline.flink.udf.UserLookupFunctionBase.
> open(UserLookupFunctionBase.java:78)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
>
> I understand why this is happening in the case of the parent-first
> classloader, but I can't understand why it's happening when I exclude
> flink-java from my job's uber jar file -- in that 2nd case, I would expect
> the job's child classloader to be used, which contains both of those
> fasterxml classes.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


some question about isChainable

2018-01-24 Thread Marvin777
Hi all:

We all know that the conditions for chaining are 9, like this:
[image: 内嵌图片 1]
However, I have a question about 2, 3, 8 in the figure above.
As far as I know, the operator cannot be null, and If the partitioner of
the edge is forward, then their parallelism must be the same. Maybe I'm
completely missing something...

Waiting for your reply.

Best regard.


Re: Event time window questions

2018-01-24 Thread Sendoh
setAutoWatermarkInterval configures how often the watermark is produced.
so if watermark is not proceeding, if you set shorter interval, you would
see t1, t1, t1, t1, t1 more often.
But what you would like to see is t1, t2, t3, t4

If you want to see count 0 when there is no incoming events,0 sounds for me
it's your use case, you can check sliding window.

I think seeing watermark in UI is possible now, or you can use debug mode to
see it.

The watermark you use won't wait for all topics(partitions). It's possible
if you implement your own watermark.

Cheers,

Sendoh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/