Re: HDFS data locality and distribution

2018-03-19 Thread Reinier Kip
Hi Chesnay,


Thanks for responding.


I managed to resolve the problem last Friday; I had a single datasource for 
each file, instead of one big datasource for all the files. The reading of the 
one or two HDFS blocks within each datasource was then distributed to a small 
percentage of slots (let's say ~10%). Some Beam runner-specific knowledge for 
Flink I did not yet have.


> the function after the groupBy() should still make full use of the 
> parallelism of the cluster
> Do note that data skew can affect how much data is distributed to each node


I do not remember seeing this behaviour, instead I remember data was 
redistributed only among slots that did the reading, but I cannot verify this 
at this point. Also, I do not know exactly how Beam operators map to Flink's. 
Key distribution is in the millions and quite uniform.


Reinier


From: Chesnay Schepler <ches...@apache.org>
Sent: 13 March 2018 12:40:02
To: user@flink.apache.org
Subject: Re: HDFS data locality and distribution

Hello,

You said that "data is distributed very badly across slots"; do you mean that 
only a small number of subtasks is reading from HDFS, or that the keyed data is 
only processed by a few subtasks?

Flink does prioritize date locality over date distribution when reading the 
files, but the function after the groupBy() should still make full use of the 
parallelism of the cluster. Do note that data skew can affect how much data is 
distributed to each node, i.e. if 80% of your data has the same key (or rather 
hash), they will all end up on the same node.

On 12.03.2018 13:49, Reinier Kip wrote:

Relevant versions: Beam 2.1, Flink 1.3.

____________
From: Reinier Kip <r...@bol.com><mailto:r...@bol.com>
Sent: 12 March 2018 13:45:47
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: HDFS data locality and distribution


Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, 
another 3 slots get about 1/5th and a last slot just a few records. This 
probably triggers disk spillover on these slots and slows down the job 
immensely. The data has many many unique keys and processing could be done in a 
highly parallel manner. From what I understand, HDFS data locality governs 
which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their 
millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to 
have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
managers.


Does the statement of input split assignment ring true? Is the fact that data 
isn't redistributed an effort from Flink to have high data locality, even if 
this means disk spillover for a few slots/tms and idleness for others? Is there 
any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier



Re: HDFS data locality and distribution

2018-03-12 Thread Reinier Kip
Relevant versions: Beam 2.1, Flink 1.3.


From: Reinier Kip <r...@bol.com>
Sent: 12 March 2018 13:45:47
To: user@flink.apache.org
Subject: HDFS data locality and distribution


Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, 
another 3 slots get about 1/5th and a last slot just a few records. This 
probably triggers disk spillover on these slots and slows down the job 
immensely. The data has many many unique keys and processing could be done in a 
highly parallel manner. From what I understand, HDFS data locality governs 
which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their 
millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to 
have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
managers.


Does the statement of input split assignment ring true? Is the fact that data 
isn't redistributed an effort from Flink to have high data locality, even if 
this means disk spillover for a few slots/tms and idleness for others? Is there 
any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier


HDFS data locality and distribution

2018-03-12 Thread Reinier Kip
Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, 
another 3 slots get about 1/5th and a last slot just a few records. This 
probably triggers disk spillover on these slots and slows down the job 
immensely. The data has many many unique keys and processing could be done in a 
highly parallel manner. From what I understand, HDFS data locality governs 
which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their 
millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to 
have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
managers.


Does the statement of input split assignment ring true? Is the fact that data 
isn't redistributed an effort from Flink to have high data locality, even if 
this means disk spillover for a few slots/tms and idleness for others? Is there 
any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier


Re: Repeated exceptions during system metrics registration

2017-09-29 Thread Reinier Kip
Why of course... Thank you for your time. I'll figure out where to go with Beam.


From: Chesnay Schepler <ches...@apache.org>
Sent: 29 September 2017 16:41:23
To: user@flink.apache.org
Subject: Re: Repeated exceptions during system metrics registration

You probably have multiple operators that are called "Map", which causes the 
metric identifier to not be unique.
As a result only 1 of these metrics is reported (whichever was registered 
first).

Giving each operator a unique name will resolve this issue, but I don't know 
exactly how to do that with Beam.

On 29.09.2017 16:03, Reinier Kip wrote:
Hi all,

I'm running a Beam pipeline on Flink and sending metrics via the Graphite 
reporter. I get repeated exceptions on the slaves, which try to register the 
same metric multiple times. Jobmanager and taskmanager data is fine: I can see 
JVM stuff, but only one datapoint here and there for tasks/operations.

I am using Beam 2.1.0, and am thus running Flink 1.3.0. What can be the cause 
of this or how can I find out?

Below you'll find the error, followed by Flink's metrics configuration.

Reinier

--

Log message: [ERROR] Error while registering metric.
Stack trace:

java.lang.IllegalArgumentException: A metric named 
bla.hdp-slave-015.taskmanager.3e32c83dedd7f4c7de3c41e2cb6d2a4a.bla-0929120623-613dc6af.operator.Map
 (Key Extractor).15.numRecordsOutPerSecond already exists
at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at 
org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151)
at 
org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336)
at 
org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:42)
at 
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:45)
at 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

--

metrics.reporters: graphite
metrics.reporter.graphite.class: 
org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.graphite.host: something
metrics.reporter.graphite.port: 2003
metrics.reporter.graphite.protocol: TCP
metrics.reporter.graphite.interval: 1 SECONDS
metrics.scope.jm: bla..jobmanager
metrics.scope.jm.job: bla..jobmanager.
metrics.scope.tm: bla..taskmanager.
metrics.scope.tm.job: bla..taskmanager..
metrics.scope.task: 
bla..taskmanager...task..
metrics.scope.operator: 
bla..taskmanager...operator..



Repeated exceptions during system metrics registration

2017-09-29 Thread Reinier Kip
Hi all,

I'm running a Beam pipeline on Flink and sending metrics via the Graphite 
reporter. I get repeated exceptions on the slaves, which try to register the 
same metric multiple times. Jobmanager and taskmanager data is fine: I can see 
JVM stuff, but only one datapoint here and there for tasks/operations.

I am using Beam 2.1.0, and am thus running Flink 1.3.0. What can be the cause 
of this or how can I find out?

Below you'll find the error, followed by Flink's metrics configuration.

Reinier

--

Log message: [ERROR] Error while registering metric.
Stack trace:

java.lang.IllegalArgumentException: A metric named 
bla.hdp-slave-015.taskmanager.3e32c83dedd7f4c7de3c41e2cb6d2a4a.bla-0929120623-613dc6af.operator.Map
 (Key Extractor).15.numRecordsOutPerSecond already exists
at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at 
org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151)
at 
org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336)
at 
org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:42)
at 
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:45)
at 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

--

metrics.reporters: graphite
metrics.reporter.graphite.class: 
org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.graphite.host: something
metrics.reporter.graphite.port: 2003
metrics.reporter.graphite.protocol: TCP
metrics.reporter.graphite.interval: 1 SECONDS
metrics.scope.jm: bla..jobmanager
metrics.scope.jm.job: bla..jobmanager.
metrics.scope.tm: bla..taskmanager.
metrics.scope.tm.job: bla..taskmanager..
metrics.scope.task: 
bla..taskmanager...task..
metrics.scope.operator: 
bla..taskmanager...operator..


EOFException related to memory segments during run of Beam pipeline on Flink

2017-08-30 Thread Reinier Kip
Hi all,

I’ve been running a Beam pipeline on Flink. Depending on the dataset size and 
the heap memory configuration of the jobmanager and taskmanager, I may run into 
an EOFException, which causes the job to fail. You will find the stacktrace 
near the bottom of this post (data censored).

I would not expect such a sudden failure as the dataset apparently grows above 
a certain size. Doesn’t Flink spill data over to disk when memory runs out? How 
do I deal with this unpredictable behaviour in a production situation? I’m 
running a clean Flink 1.3.2 with heap memory of 768MiB. The dataset size is in 
the tens of megabytes. The same root EOFException occurred in Flink 1.2.1. I 
will gladly provide more information where needed.

If this is expected behaviour, I feel it should be documented, meaning a more 
informative exception message, and managing user expectations in the guides. (I 
have not been able to find any information regarding this exception.)

Hoping that someone can enlighten me,

Reinier


08/30/2017 13:48:33 GroupReduce (GroupReduce at GroupByKey)(1/1) switched 
to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at 
GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:466)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: unable to serialize record 
FakeSerialisableObjectWithStringsAndDoubles{}
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize 
record FakeSerialisableObjectWithStringsAndDoubles{}
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:129)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:48)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:76)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:60)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:33)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:641)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:80)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:281)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1037)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.EOFException
at 
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:79)
at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:190)
at