Flink on K8s job submission best practices

2017-12-22 Thread Maximilian Bode
Hi everyone,

We are beginning to run Flink on K8s and found the basic templates [1]
as well as the example Helm chart [2] very helpful. Also the discussion
about JobManager HA [3] and Patrick's talk [4] was very interesting. All
in all it is delightful how easy everything can be set up and works out
of the box.

Now we are looking for some best practices as far as job submission is
concerned. Having played with a few alternative options, we would like
to get some input on what other people are using. What we have looked
into so far:

 1. Packaging the job jar into e.g. the JM image and submitting manually
(either from the UI or via `kubectl exec`). Ideally, we would like
to establish a more automated setup, preferably using native
Kubernetes objects.
 2. Building a separate image whose responsibility it is to submit the
job and keep it running. This could either use the API [5] or share
the Flink config so that CLI calls connect to the existing cluster.
When scheduling this as a Kubernetes deployment [6] and e.g. the
node running this client pod fails, one ends up with duplicate jobs.
One could build custom logic (poll if job exists, only submit if it
does not), but this seems fragile and it is conceivable that this
could lead to weird timing issues like different containers trying
to submit at the same time. One solution would be to implement an
atomic submit-if-not-exists, but I suppose this would need to
involve some level of locking on the JM.
 3. Schedule the client container from the step above as a Kubernetes
job [7]. This seems somewhat unidiomatic for streaming jobs that are
not expected to terminate, but one would not have to deal with
duplicate Flink jobs. In the failure scenario described above, the
(Flink) job would still be running on the Flink cluster, there just
would not be a client attached to it (as the Kubernetes job would
not be restarted). On the other hand, should the (Flink) job fail
for some reason, there is no fashion of restarting it automatically.

Are we missing something obvious? Has the Flink community come up with a
default way of submitting Flink jobs on Kubernetes yet or are there
people willing to share their experiences?

Best regards and happy holidays,
Max

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html
[2] https://github.com/docker-flink/examples/tree/master/helm/flink
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-HA-with-Kubernetes-without-Zookeeper-td15033.html
[4] https://www.youtube.com/watch?v=w721NI-mtAA Slides:
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-patrick-lucas-flink-in-containerland
[5]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#submitting-programs
[6] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/
[7]
https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
-- 
Maximilian Bode * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Get EOF from PrometheusReporter in JM

2017-10-23 Thread Maximilian Bode
Hi Tony,

thanks for troubleshooting this. I have added a commit to
https://github.com/apache/flink/pull/4586 that should enable you to use
the reporter with 1.3.2 as well.

Best regards,
Max

> Tony Wei 
> 23. September 2017 um 13:11
> Hi Chesnay,
>
> I built another flink cluster using version 1.4, set the log level to
> DEBUG, and I found that the root cause might be this
> exception: *java.lang.NullPointerException: Value returned by gauge
> lastCheckpointExternalPath was null*.
>
> I updated `CheckpointStatsTracker` to ignore external path when it is
> null, and this exception didn't happen again. The prometheus reporter
> works as well.
>
> I have created a Jira issue for
> it: https://issues.apache.org/jira/browse/FLINK-7675
> , and I will submit
> the PR after I passed Travis CI for my repository.
>
> Best Regards,
> Tony Wei
>
>  
>
>
> Tony Wei 
> 22. September 2017 um 16:20
> Hi Chesnay,
>
> I didn't try it in 1.4, so I have no idea if this also occurs in 1.4.
> For my setting for logging, It have already set to INFO level, but
> there wasn't any error or warning in log file as well.
>
> Best Regards,
> Tony Wei
>
>
> Chesnay Schepler 
> 22. September 2017 um 16:07
> The Prometheus reporter should work with 1.3.2.
>
> Does this also occur with the reporter that currently exists in 1.4?
> (to rule out new bugs from the PR).
>
> To investigate this further, please set the logging level to WARN and
> try again, as all errors in the metric system are logged on that level.
>
> On 22.09.2017 10:33, Tony Wei wrote:
>
>
> Tony Wei 
> 22. September 2017 um 10:33
> Hi, 
>
> I have built the Prometheus reporter package from this
> PR https://github.com/apache/flink/pull/4586, and used it on Flink
> 1.3.2 to record every default metrics and those from `FlinkKafkaConsumer`.
>
> Originally, everything was fine. I could get those metrics in TM from
> Prometheus just like I saw on Flink Web UI.
> However, when I turned to JM, I found Prometheus gives this error to
> me: Get http://localhost:9249/metrics: EOF.
> I checked the log on JM and saw nothing in it. There was no error
> message and 9249 port was still alive.
>
> To figure out what happened, I created another cluster and I found
> Prometheus could connect to Flink cluster if there is no running job.
> After JM triggered or completed the first checkpoint, Prometheus
> started getting ERR_EMPTY_RESPONSE from JM, but not for TM. There was
> still no error in log file and 9249 port was still alive.
>
> I was wondering where did the error occur. Flink or Prometheus reporter?
> Or It is incorrect to use Prometheus reporter on Flink 1.3.2 ? Thank you.
>
> Best Regards,
> Tony Wei


signature.asc
Description: OpenPGP digital signature


Re: flink testing

2017-04-23 Thread Maximilian Bode
Hi Georg,

Have a look at (Streaming)MultipleProgramsTestBase 
(https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
 | 
https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java).

Best regards,
Max

> Am 23.04.2017 um 17:19 schrieb Georg Heiler :
> 
> Spark testing base https://github.com/holdenk/spark-testing-base offers some 
> Base classes to use when writing tests with Spark which make it simpler to 
> write unit tests for functions in spark i.e. you do not manually need to 
> instantiate a spark context / flink execution environment for each test case.
> 
> You've written an awesome program in Spark and now its time to write some 
> tests. Only you find yourself writing the code to setup and tear down local 
> mode Spark in between each suite and you say to your self: This is not my 
> beautiful code.
> 
> 
> 
> 
> Ted Yu  schrieb am So., 23. Apr. 2017 um 10:46 Uhr:
>> Please give more context by describing what spark-test-base does :-)
>> 
>> > On Apr 22, 2017, at 10:57 PM, Georg Heiler  
>> > wrote:
>> >
>> > Hi,
>> >
>> > is there something like spark-testing-base for flink as well?
>> >
>> > Cheers,
>> > Georg


Re: Daily/hourly TumblingEventTimeWindows

2016-09-22 Thread Maximilian Bode
I have just noticed that this is exactly what it currently does. Reading the 
docs I assumed all windows would be of the same size.

> Am 22.09.2016 um 13:35 schrieb Maximilian Bode <maximilian.b...@tngtech.com>:
> 
> Hi everyone,
> 
> is there an easy way to implement a tumbling event time window that tumbles 
> at a certain time? Examples could be daily or hourly (tumbling at exactly 
> 00:00, 01:00, 02:00 etc.) windows.
> 
> So in particular, for a daily window, the first window would be shorter than 
> the rest, tumble at midnight and after that it would basically be the same as 
> a regular 24h TumbilngEventTimeWindow.
> 
> Cheers,
> Max



Daily/hourly TumblingEventTimeWindows

2016-09-22 Thread Maximilian Bode
Hi everyone,

is there an easy way to implement a tumbling event time window that tumbles at 
a certain time? Examples could be daily or hourly (tumbling at exactly 00:00, 
01:00, 02:00 etc.) windows.

So in particular, for a daily window, the first window would be shorter than 
the rest, tumble at midnight and after that it would basically be the same as a 
regular 24h TumbilngEventTimeWindow.

Cheers,
Max

Prevent job/operator from spilling to disk

2016-05-04 Thread Maximilian Bode
Hi everyone,

is there a way to prevent operators from spilling to disk? If not, would it be 
conceivable to make this configurable either per job or operator?

The use case is a batch application with the formal requirement not to persist 
in-flight data to disk (even temporarily) so it would be preferable to see the 
job fail and then be able to grant sufficient memory and run it again.

Cheers,
Max

— 
Maximilian Bode * Software Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



OOME PermGen in URLClassLoader

2016-04-15 Thread Maximilian Bode
.(TaskExecutionState.java:108)
at 
org.apache.flink.runtime.taskmanager.TaskExecutionState.(TaskExecutionState.java:78)
at 
org.apache.flink.runtime.taskmanager.Task.notifyObservers(Task.java:865)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:616)
at java.lang.Thread.run(Thread.java:744)
2016-04-14 16:53:55,489 ERROR org.apache.flink.runtime.taskmanager.Task 
- FATAL - exception in task exception handler
java.lang.OutOfMemoryError: PermGen space
at sun.misc.Unsafe.defineClass(Native Method)
at sun.reflect.ClassDefiner.defineClass(ClassDefiner.java:63)
at 
sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
at 
sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
at java.security.AccessController.doPrivileged(Native Method)
at 
sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
at 
sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
at 
sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
at 
java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:464)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:464)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:464)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at 
org.apache.flink.runtime.util.SerializedThrowable.(SerializedThrowable.java:83)
at 
org.apache.flink.runtime.taskmanager.TaskExecutionState.(TaskExecutionState.java:108)
at 
org.apache.flink.runtime.taskmanager.TaskExecutionState.(TaskExecutionState.java:78)
at 
org.apache.flink.runtime.taskmanager.Task.notifyObservers(Task.java:865)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:616)
at java.lang.Thread.run(Thread.java:744)

--
This problem seems to be reproducible. In the first run it happens towards the 
end of the job in a JDBCOutputFormat. From then on, an analogous exception is 
thrown in the JDBCInputFormat, an earlier operator.

We suspect there might be a memory leak caused by the Classloader, any ideas?

Best regards,
Max

—
Maximilian Bode * Software Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Maximilian Bode
Hi Aljoscha,

yeah I should have been clearer. I did mean those accumulators but am not 
trusting them in the sense of total number (as you said, they are reset on 
failure). On the other hand, if they do not change for a while it is pretty 
obvious that the job has ingested everything in the queue. But you are right, 
this is kind of heuristic. In combination with the fact that the 
DateTimeBucketer does not create new folders I believe this should be 
sufficient to decide when the job has basically finished, though.

So the setup is the following: The Flink job consists of a 
FlinkKafkaConsumer08, a map containing just an IntCounter accumulator and 
finally a rolling sink writing to HDFS. I start it in a per-job yarn session 
with n=3, s=4. Then I pour 2 million records in the Kafka queue the application 
is reading from. If no job/task managers are killed, the behavior is exactly as 
expected: the output files in HDFS grow with time and I can exactly monitor via 
the accumulator when every record has been ingested from Kafka. After that 
time, I give the job a few seconds and then cancel it via the web interface. 
Then still some time later (to give the job the chance to output the few 
records still hanging around) a wc -l on the output files yields exactly the 
expected 2 million.

On the other hand, if I kill a task manager while the job is in progress, one 
of the 12 output files seems to be missing as described before. A wc -l on only 
the relevant bytes as I mentioned in an earlier mail then leads to a number 
smaller than 2 million.

We are using an FsStateBackend in HDFS with a checkpoint interval of 10s.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 08.03.2016 um 17:46 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi,
> with accumulator you mean the ones you get from 
> RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not 
> fault-tolerant which means that the count in these probably doesn’t reflect 
> the actual number of elements that were processed. When a job fails and 
> restarts the accumulators should start from scratch. This makes me wonder how 
> yours ever reach the required 2 mio, for it to be considered “done”.
> 
> This keeps getting more mysterious…
> 
> By the way, what are you using as StateBackend and checkpoint interval?
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:38, Maximilian Bode <maximilian.b...@tngtech.com> 
>> wrote:
>> 
>> Hi,
>> thanks for the fast answer. Answers inline.
>> 
>>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>> 
>>> Hi,
>>> a missing part file for one of the parallel sinks is not necessarily a 
>>> problem. This can happen if that parallel instance of the sink never 
>>> received data after the job successfully restarted.
>>> 
>>> Missing data, however, is a problem. Maybe I need some more information 
>>> about your setup:
>>> 
>>> - When are you inspecting the part files?
>> Some time after the cluster is shut down
>>> - Do you shutdown the Flink Job before checking? If so, how do you shut it 
>>> down.
>> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be 
>> written only after cancelling the job, right?
>>> - When do you know whether all the data from Kafka was consumed by Flink 
>>> and has passed through the pipeline into HDFS?
>> I have an accumulator in a map right before writing into HDFS. Also, the 
>> RollingSink has a DataTimeBucketer which makes it transparent when no new 
>> data is arriving anymore as the last bucket is from some minutes ago.
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.b...@tngtech.com> 
>>>> wrote:
>>>> 
>>>> Hi Aljoscha,
>>>> 
>>>> oh I see. I was under the impression this file was used internally and the 
>>>> output being completed at the end. Ok, so I extracted the relevant lines 
>>>> using
>>>>for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
>>>> "$i.final"; done
>>>> which seems to do the trick.
>>>> 
>>>> Unfortunately, now some records are missing again. In particular, there 
>>>> are the files
>>>>part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
>>>> .valid-length files
>>>>

Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Maximilian Bode
Hi,
thanks for the fast answer. Answers inline.

> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi,
> a missing part file for one of the parallel sinks is not necessarily a 
> problem. This can happen if that parallel instance of the sink never received 
> data after the job successfully restarted.
> 
> Missing data, however, is a problem. Maybe I need some more information about 
> your setup:
> 
> - When are you inspecting the part files?
Some time after the cluster is shut down
> - Do you shutdown the Flink Job before checking? If so, how do you shut it 
> down.
Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written 
only after cancelling the job, right?
> - When do you know whether all the data from Kafka was consumed by Flink and 
> has passed through the pipeline into HDFS?
I have an accumulator in a map right before writing into HDFS. Also, the 
RollingSink has a DataTimeBucketer which makes it transparent when no new data 
is arriving anymore as the last bucket is from some minutes ago.
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.b...@tngtech.com> 
>> wrote:
>> 
>> Hi Aljoscha,
>> 
>> oh I see. I was under the impression this file was used internally and the 
>> output being completed at the end. Ok, so I extracted the relevant lines 
>> using
>>  for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
>> "$i.final"; done
>> which seems to do the trick.
>> 
>> Unfortunately, now some records are missing again. In particular, there are 
>> the files
>>  part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
>> .valid-length files
>>  part-0-1, part-1-1, ..., part-10-0
>> in the bucket, where job parallelism=12. So it looks to us as if one of the 
>> files was not even created in the second attempt. This behavior seems to be 
>> what somewhat reproducible, cf. my earlier email where the part-11 file 
>> disappeared as well.
>> 
>> Thanks again for your help.
>> 
>> Cheers,
>> Max
>> —
>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>> 
>>> Hi,
>>> are you taking the “.valid-length” files into account. The problem with 
>>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not 
>>> possible to truncate files. So the trick we’re using is to write the length 
>>> up to which a file is valid if we would normally need to truncate it. (If 
>>> the job fails in the middle of writing the output files have to be 
>>> truncated to a valid position.) For example, say you have an output file 
>>> part-8-0. Now, if there exists a file part-8-0.valid-length this file tells 
>>> you up to which position the file part-8-0 is valid. So you should only 
>>> read up to this point.
>>> 
>>> The name of the “.valid-length” suffix can also be configured, by the way, 
>>> as can all the other stuff.
>>> 
>>> If this is not the problem then I definitely have to investigate further. 
>>> I’ll also look into the Hadoop 2.4.1 build problem.
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> 
>>>> wrote:
>>>> 
>>>> Hi Aljoscha,
>>>> thanks again for getting back to me. I built from your branch and the 
>>>> exception is not occurring anymore. The RollingSink state can be restored.
>>>> 
>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are 
>>>> always some extra records after killing either a task manager or the job 
>>>> manager. Do you have an idea where this behavior might be coming from? (I 
>>>> guess concrete numbers will not help greatly as there are so many 
>>>> parameters influencing them. Still, in our test scenario, we produce 2 
>>>> million records in a Kafka queue but in the final output files there are 
>>>> on the order of 2.1 million records, so a 5% error. The job is running in 
>>>> a per-job YARN session with n=3, s=4 with a checkpointing interval of 10s.)
>>>> 
>>>> On another (maybe unrelated) note: wh

Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-08 Thread Maximilian Bode
Hi Aljoscha,

oh I see. I was under the impression this file was used internally and the 
output being completed at the end. Ok, so I extracted the relevant lines using
for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
"$i.final"; done
which seems to do the trick.

Unfortunately, now some records are missing again. In particular, there are the 
files
part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
.valid-length files
part-0-1, part-1-1, ..., part-10-0
in the bucket, where job parallelism=12. So it looks to us as if one of the 
files was not even created in the second attempt. This behavior seems to be 
what somewhat reproducible, cf. my earlier email where the part-11 file 
disappeared as well.

Thanks again for your help.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi,
> are you taking the “.valid-length” files into account. The problem with doing 
> “exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to 
> truncate files. So the trick we’re using is to write the length up to which a 
> file is valid if we would normally need to truncate it. (If the job fails in 
> the middle of writing the output files have to be truncated to a valid 
> position.) For example, say you have an output file part-8-0. Now, if there 
> exists a file part-8-0.valid-length this file tells you up to which position 
> the file part-8-0 is valid. So you should only read up to this point.
> 
> The name of the “.valid-length” suffix can also be configured, by the way, as 
> can all the other stuff.
> 
> If this is not the problem then I definitely have to investigate further. 
> I’ll also look into the Hadoop 2.4.1 build problem.
> 
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> 
>> wrote:
>> 
>> Hi Aljoscha,
>> thanks again for getting back to me. I built from your branch and the 
>> exception is not occurring anymore. The RollingSink state can be restored.
>> 
>> Still, the exactly-once guarantee seems not to be fulfilled, there are 
>> always some extra records after killing either a task manager or the job 
>> manager. Do you have an idea where this behavior might be coming from? (I 
>> guess concrete numbers will not help greatly as there are so many parameters 
>> influencing them. Still, in our test scenario, we produce 2 million records 
>> in a Kafka queue but in the final output files there are on the order of 2.1 
>> million records, so a 5% error. The job is running in a per-job YARN session 
>> with n=3, s=4 with a checkpointing interval of 10s.)
>> 
>> On another (maybe unrelated) note: when I pulled your branch, the Travis 
>> build did not go through for -Dhadoop.version=2.4.1. I have not looked into 
>> this further as of now, is this one of the tests known to fail sometimes?
>> 
>> Cheers,
>> Max
>> 
>> —
>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>> 
>>> Hi Maximilian,
>>> sorry for the delay, we where very busy with the release last week. I had a 
>>> hunch about the problem but I think I found a fix now. The problem is in 
>>> snapshot restore. When restoring, the sink tries to clean up any files that 
>>> where previously in progress. If Flink restores to the same snapshot twice 
>>> in a row then it will try to clean up the leftover files twice but they are 
>>> not there anymore, this causes the exception.
>>> 
>>> I have a fix in my branch: 
>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>> 
>>> Could you maybe try if this solves your problem? Which version of Flink are 
>>> you using? You would have to build from source to try it out. Alternatively 
>>> I could build it and put it onto a maven snapshot repository for you to try 
>>> it out.
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>> 
>>>>

Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-07 Thread Maximilian Bode
Hi Aljoscha,

thank you very much, I will try if this fixes the problem and get back to you. 
I am using 1.0.0 as of today :)

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi Maximilian,
> sorry for the delay, we where very busy with the release last week. I had a 
> hunch about the problem but I think I found a fix now. The problem is in 
> snapshot restore. When restoring, the sink tries to clean up any files that 
> where previously in progress. If Flink restores to the same snapshot twice in 
> a row then it will try to clean up the leftover files twice but they are not 
> there anymore, this causes the exception.
> 
> I have a fix in my branch: 
> https://github.com/aljoscha/flink/tree/rolling-sink-fix
> 
> Could you maybe try if this solves your problem? Which version of Flink are 
> you using? You would have to build from source to try it out. Alternatively I 
> could build it and put it onto a maven snapshot repository for you to try it 
> out.
> 
> Cheers,
> Aljoscha
>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote:
>> 
>> Hi,
>> did you check whether there are any files at your specified HDFS output 
>> location? If yes, which files are there?
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> 
>>> wrote:
>>> 
>>> Just for the sake of completeness: this also happens when killing a task 
>>> manager and is therefore probably unrelated to job manager HA.
>>> 
>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>>>> <maximilian.b...@tngtech.com>:
>>>> 
>>>> Hi everyone,
>>>> 
>>>> unfortunately, I am running into another problem trying to establish 
>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>> 
>>>> When using
>>>> 
>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>> sink.setBucketer(new NonRollingBucketer());
>>>> output.addSink(sink);
>>>> 
>>>> and then killing the job manager, the new job manager is unable to restore 
>>>> the old state throwing
>>>> ---
>>>> java.lang.Exception: Could not restore checkpointed state to operators and 
>>>> functions
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.Exception: Failed to restore state to function: 
>>>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 
>>>> was neither moved to pending nor is still in progress.
>>>>at 
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>... 3 more
>>>> Caused by: java.lang.RuntimeException: In-Progress file 
>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved 
>>>> to pending nor is still in progress.
>>>>at 
>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>at 
>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>at 
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>... 4 more
>>>> ---
>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 
>>>> 2.4.0 – might this be the same issue?
>>>> 
>>>> Another thing I could think of is that the job is not configured correctly 
>>>> and there is some sort of timing issue. The checkpoint interval is 10 
>>>> seconds, everything else was left at default value. Then again, as the 
>>>> NonRollingBucketer is used, there should not be any timing issues, right?
>>>> 
>>>> Cheers,
>>>> Max
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>> 
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>> 
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-03 Thread Maximilian Bode
   12176987 2016-03-03 14:52 
/hdfs/dir/outbound/part-5-0
-rw-r--r--   2 user hadoop   12165782 2016-03-03 14:52 
/hdfs/dir/outbound/part-6-0
-rw-r--r--   2 user hadoop9474037 2016-03-03 14:52 
/hdfs/dir/outbound/part-7-0
-rw-r--r--   2 user hadoop   12136347 2016-03-03 14:52 
/hdfs/dir/outbound/part-8-0
-rw-r--r--   2 user hadoop   12305943 2016-03-03 14:52 
/hdfs/dir/outbound/part-9-0

Can you see from this what is going wrong?

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 03.03.2016 um 14:50 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi,
> did you check whether there are any files at your specified HDFS output 
> location? If yes, which files are there?
> 
> Cheers,
> Aljoscha
>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> 
>> wrote:
>> 
>> Just for the sake of completeness: this also happens when killing a task 
>> manager and is therefore probably unrelated to job manager HA.
>> 
>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>>> <maximilian.b...@tngtech.com>:
>>> 
>>> Hi everyone,
>>> 
>>> unfortunately, I am running into another problem trying to establish 
>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>> 
>>> When using
>>> 
>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>> sink.setBucketer(new NonRollingBucketer());
>>> output.addSink(sink);
>>> 
>>> and then killing the job manager, the new job manager is unable to restore 
>>> the old state throwing
>>> ---
>>> java.lang.Exception: Could not restore checkpointed state to operators and 
>>> functions
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:744)
>>> Caused by: java.lang.Exception: Failed to restore state to function: 
>>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was 
>>> neither moved to pending nor is still in progress.
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>> ... 3 more
>>> Caused by: java.lang.RuntimeException: In-Progress file 
>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to 
>>> pending nor is still in progress.
>>> at 
>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>> at 
>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>> ... 4 more
>>> ---
>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 
>>> 2.4.0 – might this be the same issue?
>>> 
>>> Another thing I could think of is that the job is not configured correctly 
>>> and there is some sort of timing issue. The checkpoint interval is 10 
>>> seconds, everything else was left at default value. Then again, as the 
>>> NonRollingBucketer is used, there should not be any timing issues, right?
>>> 
>>> Cheers,
>>> Max
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>> 
>>> —
>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Jobmanager HA with Rolling Sink in HDFS

2016-03-03 Thread Maximilian Bode
Just for the sake of completeness: this also happens when killing a task 
manager and is therefore probably unrelated to job manager HA.

> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <maximilian.b...@tngtech.com>:
> 
> Hi everyone,
> 
> unfortunately, I am running into another problem trying to establish exactly 
> once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
> 
> When using
> 
> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound
>  ");
> sink.setBucketer(new NonRollingBucketer());
> output.addSink(sink);
> 
> and then killing the job manager, the new job manager is unable to restore 
> the old state throwing
> ---
> java.lang.Exception: Could not restore checkpointed state to operators and 
> functions
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.Exception: Failed to restore state to function: 
> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 
>  was neither moved to 
> pending nor is still in progress.
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>   ... 3 more
> Caused by: java.lang.RuntimeException: In-Progress file 
> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 
>  was neither moved to 
> pending nor is still in progress.
>   at 
> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>   at 
> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>   ... 4 more
> ---
> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 
> 2.4.0 – might this be the same issue?
> 
> Another thing I could think of is that the job is not configured correctly 
> and there is some sort of timing issue. The checkpoint interval is 10 
> seconds, everything else was left at default value. Then again, as the 
> NonRollingBucketer is used, there should not be any timing issues, right?
> 
> Cheers,
>  Max
> 
> [1] https://issues.apache.org/jira/browse/FLINK-2979 
> <https://issues.apache.org/jira/browse/FLINK-2979>
> 
> —
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com 
> <mailto:maximilian.b...@tngtech.com>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Jobmanager HA with Rolling Sink in HDFS

2016-03-03 Thread Maximilian Bode
Hi everyone,

unfortunately, I am running into another problem trying to establish exactly 
once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).

When using

RollingSink<Tuple3<Integer,Integer,String>> sink = new 
RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
sink.setBucketer(new NonRollingBucketer());
output.addSink(sink);

and then killing the job manager, the new job manager is unable to restore the 
old state throwing
---
java.lang.Exception: Could not restore checkpointed state to operators and 
functions
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.Exception: Failed to restore state to function: 
In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was 
neither moved to pending nor is still in progress.
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
... 3 more
Caused by: java.lang.RuntimeException: In-Progress file 
hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to 
pending nor is still in progress.
at 
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
at 
org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
... 4 more
---
I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 
2.4.0 – might this be the same issue?

Another thing I could think of is that the job is not configured correctly and 
there is some sort of timing issue. The checkpoint interval is 10 seconds, 
everything else was left at default value. Then again, as the 
NonRollingBucketer is used, there should not be any timing issues, right?

Cheers,
 Max

[1] https://issues.apache.org/jira/browse/FLINK-2979

—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: YARN JobManager HA using wrong network interface

2016-03-03 Thread Maximilian Bode
Hi Ufuk, Till and Stephan,

Yes, that is what we observed. The primary hostname, i.e. the one returned by 
the unix hostname command, is in fact bound to the eth0 interface, whereas 
Flink uses the eth1 interface (pertaining to another hostname).

Changing akka.lookup.timeout to 100 s seems to fix the problem as now the new 
job manager is available in sufficient time. I still would agree with Stephan 
on taking the local hostname being the preferred strategy.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 03.03.2016 um 12:29 schrieb Till Rohrmann <till.rohrm...@gmail.com>:
> 
> No I don't think this behaviour has been introduced by HA. That is the 
> default behaviour we used for a long time. If you think we should still 
> change it, then I can open an issue for it.
> 
> On Thu, Mar 3, 2016 at 12:20 PM, Stephan Ewen <se...@apache.org 
> <mailto:se...@apache.org>> wrote:
> Okay, that is a change from the original behavior, introduced in HA. 
> Originally, if the connection attempts failed, it always returned the 
> InetAddress.getLocalHost() interface.
> I think we should change it back to that, because that interface is by far 
> the best possible heuristic.
> 
> On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <trohrm...@apache.org 
> <mailto:trohrm...@apache.org>> wrote:
> If I’m not mistaken, then it’s not necessarily true that the heuristic 
> returns InetAddress.getLocalHost() in all cases. The heuristic will select 
> the first network interface with the afore-mentioned conditions but before 
> returning it, it will try a last time to connect to the JM via the interface 
> bound to InetAddress.getLocalHost(). However, if this fails, then the 
> heuristically selected network interface will be returned.
> 
> 
> On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <se...@apache.org 
> <mailto:se...@apache.org>> wrote:
> If the ThasManager cannot connect to the JobManager, it will use the 
> interface that is bound to the machine's host name 
> ("InetAddress.getLocalHost()").
> 
> So, the best way to fix this would be to make sure that all machines have a 
> proper network configuration. Then Flink would either use an address that can 
> connect (via trying various interfaces), or it would default back to the 
> hostname/interface that is configured on the machine.
> 
> 
> On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <trohrm...@apache.org 
> <mailto:trohrm...@apache.org>> wrote:
> Hi Max,
> 
> the problem is that before starting the TM, we have to find the network 
> interface which is reachable by the other machines. So what we do is to 
> connect to the current JobManager. If it should happen, as in your case, that 
> the JobManager just died and the new JM address has not been written to 
> ZooKeeper, then the TMs don’t have much choice other than using the heuristic.
> 
> I can’t really tell why eth1 is chosen over eth0. The condition is that the 
> interface address is an Inet4Address, no link local address as well as not a 
> loopback address.
> 
> Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the 
> easiest solution to solve your problem. I’ve checked the default value is set 
> to 10 s which might be a bit too low for restarting a new JM and publishing 
> its address via ZooKeeper.
> 
> Cheers,
> Till
> 
> 
> On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> I had an offline chat with Till about this. He pointed out that the
> address is chosen once at start up time (while not being able to
> connect to the old job manager) and then it stays fixed at eth1.
> 
> You can increase the lookup timeout by setting akka.lookup.timeout to
> a higher value (like 100 s). This is the only workaroud I'm aware of
> at this point. Maybe Till can chime in here whether this has other
> implications as well?
> 
> – Ufuk
> 
> On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> > Hey Max!
> >
> > for the first WARN in
> > org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> > expected if the new leader has not updated ZooKeeper yet. The
> > important thing is that the new leading job manager is eventually
> > retrieved. This did happen, right?
> >
> > Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> > task manager should re-try connecting to it with the same strategy as
> >

YARN JobManager HA using wrong network interface

2016-03-03 Thread Maximilian Bode
  org.apache.flink.runtime.net.ConnectionUtils  
- Failed to connect from address 
'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils  
- Failed to connect from address '/10.127.68.136': Connection 
refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils  
- Failed to connect from address '/10.120.193.110': Connection 
refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils  
- Failed to connect from address '/10.127.68.136': Connection 
refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils  
- Failed to connect from address '/127.0.0.1': Connection refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils  
- Failed to connect from address '/10.120.193.110': Connection 
refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils  
- Failed to connect from address '/10.127.68.136': Connection 
refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils  
- Failed to connect from address '/127.0.0.1': Connection refused
---
After five repetitions, the task manager stops trying to retrieve the leader 
and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from now 
on:
---
2016-03-02 18:01:23,650 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService.
2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn   
- EventThread shut down
2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
- Session: 0x25229757cff035b closed
2016-03-02 18:01:23,664 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- TaskManager will use hostname/address 
'task.manager.eth1.hostname.com' (10.120.193.110) for communication.
---
Following the new jobmanager is discovered and the taskmanager is able to 
register at the jobmanager using eth1. The problem is that connections TO eth1 
are not possible. So flink should always use eth0. The exception we later see 
is:
---
java.io.IOException: Connecting the channel failed: Connecting to remote task 
manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has failed. 
This might indicate that the remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:744)
---
The root cause seems to be that network interface selection is still using the 
old jobmanager location and hence is not able to choose the right interface. In 
particular, it seems that iteration order over the network interfaces differs 
between the HEURISTIC and SLOW strategy, which then leads to the wrong 
interface being selected.

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: Message signed with OpenPGP using GPGMail


Backpressure in the context of JDBCOutputFormat update

2016-01-21 Thread Maximilian Bode
Hi everyone,

in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a 
database update) is performing slower than the other one (an insert). The job 
as a whole is also slow as upstream operators are slowed down due to 
backpressure. I am able to speed up the whole job by introducing an a priori 
unnecessary .distinct(), which of course blocks downstream execution of the 
slow sink, which in turn seems to be able to execute faster when given all data 
at once.

Any ideas what is going on here? Is there something I can do without 
introducing unnecessary computation steps?

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com * 0176 1000 
75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



Re: Backpressure in the context of JDBCOutputFormat update

2016-01-21 Thread Maximilian Bode
Hi Robert,
sorry, I should have been clearer in my initial mail. The two cases I was 
comparing are:

1) distinct() before Insert (which is necessary as we have a unique key 
constraint in our database), no distinct() before update
2) distinct() before insert AND distinct() before update

The test data used actually only contains unique values for the affected field 
though, so the dataset size is not reduced in case 2.

In case 1 the insert does not start until all the data has arrived at 
distinct() while the update is already going along (slowing down upstream 
operators as well). In case 2 both sinks wait for their respective distinct()'s 
(which is reached much faster now), then start roughly at the same time leading 
to a shorter net job time for job 2 as compared to 1.

A pause operator might be useful, yes.

The update should not be an inherently much more expensive operation, as the 
WHERE clause only contains the table's primary key.

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com * 0176 1000 
75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 21.01.2016 um 15:57 schrieb Robert Metzger <rmetz...@apache.org>:
> 
> Hi Max,
> 
> is the distinct() operation reducing the size of the DataSet? If so, I assume 
> you have an idempotent update and the job is faster because fewer updates are 
> done?
> if the distinct() operator is not changing anything, then, the job might be 
> faster because the INSERT is done while Flink is still executing the 
> distinct() operation. So the insert is over when the updates are starting. 
> This would mean that concurrent inserts and updates on the database are much 
> slower than doing this sequentially.
> 
> I'm wondering if there is a way in Flink to explicitly ask for spilling an 
> intermediate operator to "pause" execution:
> 
> Source - > (spill for pausing) ---> (update sink)
> \
>  --- > (insert)
> 
> I don't have a lot of practical experience with RDBMS, but I guess updates 
> are slower because an index lookup + update is necessary. Maybe optimizing 
> the database configuration / schema / indexes is more promising. I think its 
> indeed much nicer to avoid any unnecessary steps in Flink.
> 
> Did you do any "microbenchmarks" for the update and insert part? I guess that 
> would help a lot to understand the impact of certain index structures, 
> batching sizes, or database drivers.
> 
> Regards,
> Robert
> 
> 
> 
> 
> On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <maximilian.b...@tngtech.com 
> <mailto:maximilian.b...@tngtech.com>> wrote:
> Hi everyone,
> 
> in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a 
> database update) is performing slower than the other one (an insert). The job 
> as a whole is also slow as upstream operators are slowed down due to 
> backpressure. I am able to speed up the whole job by introducing an a priori 
> unnecessary .distinct(), which of course blocks downstream execution of the 
> slow sink, which in turn seems to be able to execute faster when given all 
> data at once.
> 
> Any ideas what is going on here? Is there something I can do without 
> introducing unnecessary computation steps?
> 
> Cheers,
> Max
> — 
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com 
> <mailto:maximilian.b...@tngtech.com> * 0176 1000 75 50 
> <tel:0176%201000%2075%2050>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 



JDBCInputFormat GC overhead limit exceeded error

2016-01-19 Thread Maximilian Bode
Hi everyone,

I am facing a problem using the JDBCInputFormat which occurred in a larger 
Flink job. As a minimal example I can reproduce it when just writing data into 
a csv after having read it from a database, i.e.

DataSet<Tuple1> existingData = env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("oracle.jdbc.driver.OracleDriver")
.setUsername(…)
.setPassword(…)
.setDBUrl(…)
.setQuery("select DATA from TABLENAME")
.finish(),
new TupleTypeInfo<>(Tuple1.class, BasicTypeInfo.STRING_TYPE_INFO));
existingData.writeAsCsv(…);

where DATA is a column containing strings of length ~25 characters and 
TABLENAME contains 20 million rows.

After starting the job on a YARN cluster (using -tm 3072 and leaving the other 
memory settings at default values), Flink happily goes along at first but then 
fails after something like three million records have been sent by the 
JDBCInputFormat. The Exception reads "The slot in which the task was executed 
has been released. Probably loss of TaskManager …". The local taskmanager.log 
in the affected container reads
"java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1063)
at 
org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:119)
at 
org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at 
org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)"

Any ideas what is going wrong here?

Cheers,
Max

— 
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



Re: JDBCInputFormat GC overhead limit exceeded error

2016-01-19 Thread Maximilian Bode
Hi Robert,

I am using 0.10.1.

> Am 19.01.2016 um 17:42 schrieb Robert Metzger <rmetz...@apache.org>:
> 
> Hi Max,
> 
> which version of Flink are you using?
> 
> On Tue, Jan 19, 2016 at 5:35 PM, Maximilian Bode <maximilian.b...@tngtech.com 
> <mailto:maximilian.b...@tngtech.com>> wrote:
> Hi everyone,
> 
> I am facing a problem using the JDBCInputFormat which occurred in a larger 
> Flink job. As a minimal example I can reproduce it when just writing data 
> into a csv after having read it from a database, i.e.
> 
> DataSet<Tuple1> existingData = env.createInput(
>   JDBCInputFormat.buildJDBCInputFormat()
>   .setDrivername("oracle.jdbc.driver.OracleDriver")
>   .setUsername(…)
>   .setPassword(…)
>   .setDBUrl(…)
>   .setQuery("select DATA from TABLENAME")
>   .finish(),
>   new TupleTypeInfo<>(Tuple1.class, BasicTypeInfo.STRING_TYPE_INFO));
> existingData.writeAsCsv(…);
> 
> where DATA is a column containing strings of length ~25 characters and 
> TABLENAME contains 20 million rows.
> 
> After starting the job on a YARN cluster (using -tm 3072 and leaving the 
> other memory settings at default values), Flink happily goes along at first 
> but then fails after something like three million records have been sent by 
> the JDBCInputFormat. The Exception reads "The slot in which the task was 
> executed has been released. Probably loss of TaskManager …". The local 
> taskmanager.log in the affected container reads
> "java.lang.OutOfMemoryError: GC overhead limit exceeded
> at 
> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1063)
> at 
> org.jboss.netty.channel.socket.nio.NioClientBoss.processConnectTimeout(NioClientBoss.java:119)
> at 
> org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:83)
> at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at 
> org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)"
> 
> Any ideas what is going wrong here?
> 
> Cheers,
> Max
> 
> —
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com 
> <mailto:maximilian.b...@tngtech.com>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Flink on EMR Question

2016-01-05 Thread Maximilian Bode
Hi everyone,

Regarding Q1, I believe I have witnessed a comparable phenomenon in a (3-node, 
non-EMR) YARN cluster. After shutting down the yarn session via `stop`, one 
container seems to linger around. `yarn application -list` is empty, whereas 
`bin/yarn-session.sh -q` lists the left-over container. Also, there is still 
one application shown as ‚running‘ in Ambari’s YARN pane under current 
applications. Then, after some time (order of a few minutes) it disappears and 
the resources are available again.

I have not tested this behavior extensibly so far. Noticeably, I was not able 
to reproduce it by just starting a session and then ending it again right away 
without looking at the JobManager web interface. Maybe this produces some kind 
of lag as far as YARN containers are concerned?

Cheers,
Max

> Am 04.01.2016 um 12:52 schrieb Chiwan Park :
> 
> Hi All,
> 
> I have some problems using Flink on Amazon EMR cluster.
> 
> Q1. Sometimes, jobmanager container still exists after destroying yarn 
> session by pressing Ctrl+C. In that case, Flink YARN app seems exited 
> correctly in YARN RM dashboard. But there is a running container in the 
> dashboard. From logs of the container, I realize that the container is 
> jobmanager.
> 
> I cannot kill the container because there is no permission to restart YARN RM 
> in Amazon EMR. In my small Hadoop Cluster (w/3 nodes), the problem doesn’t 
> appear.
> 
> Q2. I tried to use S3 file system in Flink on EMR. But I can’t use it because 
> of version conflict of Apache Httpclient. In default, implementation of S3 
> file system in EMR is `com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem` 
> which is linked with other version of Apache Httpclient.
> 
> As I wrote above, I cannot restart Hadoop cluster after modifying 
> conf-site.xml because of lack of permission. How can I solve this problem?
> 
> Regards,
> Chiwan Park
> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Checkpoints in batch processing & JDBC Output Format

2015-11-09 Thread Maximilian Bode
Hi everyone,

I am considering using Flink in a project. The setting would be a YARN cluster 
where data is first read in from HDFS, then processed and finally written into 
an Oracle database using an upsert command. If I understand the documentation 
correctly, the DataSet API would be the natural candidate for this problem.

My first question is about the checkpointing system. Apparently (e.g. [1] and 
[2]) it does not apply to batch processing. So how does Flink handle failures 
during batch processing? For the use case described above, 'at least once' 
semantics would suffice – still, are 'exactly once' guarantees possible?
For example, how does Flink handle a failure of one taskmanager during a batch 
process? What happens in this case, if the data has already partly been written 
to the database?

Secondly, the most obvious, straight-forward approach of connecting to the 
Oracle DB would be the JDBC Output Format. In [3], it was mentioned that it 
does not have many users and might not be trusted. What is the status on this?

Best regards,
Max

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-Spark-tp583p587.html
[2] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Batch-Processing-as-Streaming-td1909.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PotsgreSQL-JDBC-Sink-quot-writeRecord-failed-quot-and-quot-Batch-element-cancelled-quot-on-upsert-td623.html


signature.asc
Description: Message signed with OpenPGP using GPGMail