Re: HDFS data locality and distribution
Hi Chesnay, Thanks for responding. I managed to resolve the problem last Friday; I had a single datasource for each file, instead of one big datasource for all the files. The reading of the one or two HDFS blocks within each datasource was then distributed to a small percentage of slots (let's say ~10%). Some Beam runner-specific knowledge for Flink I did not yet have. > the function after the groupBy() should still make full use of the > parallelism of the cluster > Do note that data skew can affect how much data is distributed to each node I do not remember seeing this behaviour, instead I remember data was redistributed only among slots that did the reading, but I cannot verify this at this point. Also, I do not know exactly how Beam operators map to Flink's. Key distribution is in the millions and quite uniform. Reinier From: Chesnay Schepler <ches...@apache.org> Sent: 13 March 2018 12:40:02 To: user@flink.apache.org Subject: Re: HDFS data locality and distribution Hello, You said that "data is distributed very badly across slots"; do you mean that only a small number of subtasks is reading from HDFS, or that the keyed data is only processed by a few subtasks? Flink does prioritize date locality over date distribution when reading the files, but the function after the groupBy() should still make full use of the parallelism of the cluster. Do note that data skew can affect how much data is distributed to each node, i.e. if 80% of your data has the same key (or rather hash), they will all end up on the same node. On 12.03.2018 13:49, Reinier Kip wrote: Relevant versions: Beam 2.1, Flink 1.3. ____________ From: Reinier Kip <r...@bol.com><mailto:r...@bol.com> Sent: 12 March 2018 13:45:47 To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: HDFS data locality and distribution Hey all, I'm trying to batch-process 30-ish files from HDFS, but I see that data is distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, another 3 slots get about 1/5th and a last slot just a few records. This probably triggers disk spillover on these slots and slows down the job immensely. The data has many many unique keys and processing could be done in a highly parallel manner. From what I understand, HDFS data locality governs which splits are assigned to which subtask. * I'm running a Beam on Flink on YARN pipeline. * I'm reading 30-ish files, whose records are later grouped by their millions of unique keys. * For now, I have 8 task managers by 4 slots. Beam sets all subtasks to have 32 parallelism. * Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task managers. Does the statement of input split assignment ring true? Is the fact that data isn't redistributed an effort from Flink to have high data locality, even if this means disk spillover for a few slots/tms and idleness for others? Is there any use for parallelism if work isn't distributed anyway? Thanks for your time, Reinier
Re: HDFS data locality and distribution
Relevant versions: Beam 2.1, Flink 1.3. From: Reinier Kip <r...@bol.com> Sent: 12 March 2018 13:45:47 To: user@flink.apache.org Subject: HDFS data locality and distribution Hey all, I'm trying to batch-process 30-ish files from HDFS, but I see that data is distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, another 3 slots get about 1/5th and a last slot just a few records. This probably triggers disk spillover on these slots and slows down the job immensely. The data has many many unique keys and processing could be done in a highly parallel manner. From what I understand, HDFS data locality governs which splits are assigned to which subtask. * I'm running a Beam on Flink on YARN pipeline. * I'm reading 30-ish files, whose records are later grouped by their millions of unique keys. * For now, I have 8 task managers by 4 slots. Beam sets all subtasks to have 32 parallelism. * Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task managers. Does the statement of input split assignment ring true? Is the fact that data isn't redistributed an effort from Flink to have high data locality, even if this means disk spillover for a few slots/tms and idleness for others? Is there any use for parallelism if work isn't distributed anyway? Thanks for your time, Reinier
HDFS data locality and distribution
Hey all, I'm trying to batch-process 30-ish files from HDFS, but I see that data is distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, another 3 slots get about 1/5th and a last slot just a few records. This probably triggers disk spillover on these slots and slows down the job immensely. The data has many many unique keys and processing could be done in a highly parallel manner. From what I understand, HDFS data locality governs which splits are assigned to which subtask. * I'm running a Beam on Flink on YARN pipeline. * I'm reading 30-ish files, whose records are later grouped by their millions of unique keys. * For now, I have 8 task managers by 4 slots. Beam sets all subtasks to have 32 parallelism. * Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task managers. Does the statement of input split assignment ring true? Is the fact that data isn't redistributed an effort from Flink to have high data locality, even if this means disk spillover for a few slots/tms and idleness for others? Is there any use for parallelism if work isn't distributed anyway? Thanks for your time, Reinier
Re: Repeated exceptions during system metrics registration
Why of course... Thank you for your time. I'll figure out where to go with Beam. From: Chesnay Schepler <ches...@apache.org> Sent: 29 September 2017 16:41:23 To: user@flink.apache.org Subject: Re: Repeated exceptions during system metrics registration You probably have multiple operators that are called "Map", which causes the metric identifier to not be unique. As a result only 1 of these metrics is reported (whichever was registered first). Giving each operator a unique name will resolve this issue, but I don't know exactly how to do that with Beam. On 29.09.2017 16:03, Reinier Kip wrote: Hi all, I'm running a Beam pipeline on Flink and sending metrics via the Graphite reporter. I get repeated exceptions on the slaves, which try to register the same metric multiple times. Jobmanager and taskmanager data is fine: I can see JVM stuff, but only one datapoint here and there for tasks/operations. I am using Beam 2.1.0, and am thus running Flink 1.3.0. What can be the cause of this or how can I find out? Below you'll find the error, followed by Flink's metrics configuration. Reinier -- Log message: [ERROR] Error while registering metric. Stack trace: java.lang.IllegalArgumentException: A metric named bla.hdp-slave-015.taskmanager.3e32c83dedd7f4c7de3c41e2cb6d2a4a.bla-0929120623-613dc6af.operator.Map (Key Extractor).15.numRecordsOutPerSecond already exists at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) at org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151) at org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336) at org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:42) at org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:45) at org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133) at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299) at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) -- metrics.reporters: graphite metrics.reporter.graphite.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.graphite.host: something metrics.reporter.graphite.port: 2003 metrics.reporter.graphite.protocol: TCP metrics.reporter.graphite.interval: 1 SECONDS metrics.scope.jm: bla..jobmanager metrics.scope.jm.job: bla..jobmanager. metrics.scope.tm: bla..taskmanager. metrics.scope.tm.job: bla..taskmanager.. metrics.scope.task: bla..taskmanager...task.. metrics.scope.operator: bla..taskmanager...operator..
Repeated exceptions during system metrics registration
Hi all, I'm running a Beam pipeline on Flink and sending metrics via the Graphite reporter. I get repeated exceptions on the slaves, which try to register the same metric multiple times. Jobmanager and taskmanager data is fine: I can see JVM stuff, but only one datapoint here and there for tasks/operations. I am using Beam 2.1.0, and am thus running Flink 1.3.0. What can be the cause of this or how can I find out? Below you'll find the error, followed by Flink's metrics configuration. Reinier -- Log message: [ERROR] Error while registering metric. Stack trace: java.lang.IllegalArgumentException: A metric named bla.hdp-slave-015.taskmanager.3e32c83dedd7f4c7de3c41e2cb6d2a4a.bla-0929120623-613dc6af.operator.Map (Key Extractor).15.numRecordsOutPerSecond already exists at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) at org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151) at org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336) at org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:42) at org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:45) at org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133) at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299) at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) -- metrics.reporters: graphite metrics.reporter.graphite.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.graphite.host: something metrics.reporter.graphite.port: 2003 metrics.reporter.graphite.protocol: TCP metrics.reporter.graphite.interval: 1 SECONDS metrics.scope.jm: bla..jobmanager metrics.scope.jm.job: bla..jobmanager. metrics.scope.tm: bla..taskmanager. metrics.scope.tm.job: bla..taskmanager.. metrics.scope.task: bla..taskmanager...task.. metrics.scope.operator: bla..taskmanager...operator..
EOFException related to memory segments during run of Beam pipeline on Flink
Hi all, I’ve been running a Beam pipeline on Flink. Depending on the dataset size and the heap memory configuration of the jobmanager and taskmanager, I may run into an EOFException, which causes the job to fail. You will find the stacktrace near the bottom of this post (data censored). I would not expect such a sudden failure as the dataset apparently grows above a certain size. Doesn’t Flink spill data over to disk when memory runs out? How do I deal with this unpredictable behaviour in a production situation? I’m running a clean Flink 1.3.2 with heap memory of 768MiB. The dataset size is in the tens of megabytes. The same root EOFException occurred in Flink 1.2.1. I will gladly provide more information where needed. If this is expected behaviour, I feel it should be documented, meaning a more informative exception message, and managing user expectations in the guides. (I have not been able to find any information regarding this exception.) Hoping that someone can enlighten me, Reinier 08/30/2017 13:48:33 GroupReduce (GroupReduce at GroupByKey)(1/1) switched to FAILED java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{} at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:466) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{} at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{} at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{} at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:129) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:48) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:76) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:60) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:33) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:641) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:80) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:281) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1037) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.io.EOFException at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:79) at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:190) at