[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=151550=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151550 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 05/Oct/18 09:01 Start Date: 05/Oct/18 09:01 Worklog Time Spent: 10m Work Description: iemejia closed pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 7a7f9902db2..651e5223721 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark; import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -99,6 +100,16 @@ public String create(PipelineOptions options) { void setCheckpointDurationMillis(Long durationMillis); + @Description( + "If set bundleSize will be used for splitting BoundedSources, otherwise default to " + + "splitting BoundedSources on Spark defaultParallelism. Most effective when used with " + + "Spark dynamicAllocation.") + @Default.Long(0) + Long getBundleSize(); + + @Experimental + void setBundleSize(Long value); + @Description("Enable/disable sending aggregator values to Spark's metric sinks") @Default.Boolean(true) Boolean getEnableSparkMetricSinks(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index 67d0ab25e72..c94c4ec2a9e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -30,6 +30,7 @@ import java.util.NoSuchElementException; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; @@ -65,6 +66,7 @@ private final BoundedSource source; private final SerializablePipelineOptions options; private final int numPartitions; +private final long bundleSize; private final String stepName; private final Accumulator metricsAccum; @@ -88,6 +90,7 @@ public Bounded( // ** the configuration "spark.default.parallelism" takes precedence over all of the above ** this.numPartitions = sc.defaultParallelism(); checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); + this.bundleSize = options.get().as(SparkPipelineOptions.class).getBundleSize(); this.stepName = stepName; this.metricsAccum = MetricsAccumulator.getInstance(); } @@ -96,19 +99,23 @@ public Bounded( @Override public Partition[] getPartitions() { - long desiredSizeBytes = DEFAULT_BUNDLE_SIZE; try { -desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / numPartitions; - } catch (Exception e) { -LOG.warn( -"Failed to get estimated bundle size for source {}, using default bundle " -+ "size of {} bytes.", -source, -DEFAULT_BUNDLE_SIZE); - } - try { -List> partitionedSources = -source.split(desiredSizeBytes, options.get()); +List> partitionedSources; +if (bundleSize > 0) { + partitionedSources = source.split(bundleSize, options.get()); +} else { + long desiredSizeBytes = DEFAULT_BUNDLE_SIZE; + try { +desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / numPartitions; + } catch (Exception e) { +LOG.warn( +"Failed to get estimated bundle size for source {}, using default bundle " ++ "size of {} bytes.", +source, +DEFAULT_BUNDLE_SIZE); + } + partitionedSources = source.split(desiredSizeBytes, options.get()); +
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=151527=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151527 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 05/Oct/18 07:58 Start Date: 05/Oct/18 07:58 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-427279202 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 151527) Time Spent: 4.5h (was: 4h 20m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Fix For: 2.8.0 > > Time Spent: 4.5h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150800=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150800 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 03/Oct/18 16:06 Start Date: 03/Oct/18 16:06 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r221656872 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ## @@ -99,6 +99,12 @@ public String create(PipelineOptions options) { void setCheckpointDurationMillis(Long durationMillis); + @Description("Bundle Size for splitting BoundedSources.") + @Default.Long(0) + long getBundleSize(); Review comment: Can you make the get/set type not a primitive for consistency with the other options. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150800) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150797=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150797 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 03/Oct/18 16:06 Start Date: 03/Oct/18 16:06 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r221661790 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ## @@ -68,6 +69,32 @@ TranslationUtils.functionToFlatMapFunction(WindowingHelpers.windowFunction()), true); } + /** + * An implementation of {@link + * org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} for the Spark runner. + * Used only when bundleSize is set in SparkPipelineOptions. Evaluating if the default Partitioner + * causes a reshuffle of the data. + */ + @Experimental Review comment: Can you please remove this annotation from here and put it in the `SparkPipelineOptions.setBundleSize`. Internal implementation of runners do not have so far a stable API so we have no issues here, it is the options that the users can depend on and what I wanted to highlight is that this knob can disappear if we find a better compromise or if we can make a good default without it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150797) Time Spent: 4h 10m (was: 4h) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150798=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150798 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 03/Oct/18 16:06 Start Date: 03/Oct/18 16:06 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r221672729 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ## @@ -68,6 +69,32 @@ TranslationUtils.functionToFlatMapFunction(WindowingHelpers.windowFunction()), true); } + /** + * An implementation of {@link + * org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} for the Spark runner. + * Used only when bundleSize is set in SparkPipelineOptions. Evaluating if the default Partitioner + * causes a reshuffle of the data. + */ + @Experimental + public static Review comment: Is the only difference with the other is the partitioner maybe we can pass it better as a parameter (to not have repeated code). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150798) Time Spent: 4h 20m (was: 4h 10m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150799=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150799 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 03/Oct/18 16:06 Start Date: 03/Oct/18 16:06 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r221660123 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ## @@ -99,6 +99,12 @@ public String create(PipelineOptions options) { void setCheckpointDurationMillis(Long durationMillis); + @Description("Bundle Size for splitting BoundedSources.") Review comment: probably worth documenting the behavior (if set otherwise ...), also mention the need for the spark.* variables in the environtment to be set (for dynamic allocation). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150799) Time Spent: 4h 20m (was: 4h 10m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150284=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150284 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 02/Oct/18 02:01 Start Date: 02/Oct/18 02:01 Worklog Time Spent: 10m Work Description: amitsela commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-426121773 As for `BEAM-5519`, I'll reply in the JIRA ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150284) Time Spent: 4h (was: 3h 50m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150282=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150282 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 02/Oct/18 02:00 Start Date: 02/Oct/18 02:00 Worklog Time Spent: 10m Work Description: amitsela commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-426121506 @iemejia @kyle-winkelman I agree that for batch, the effort to keep the "default partitioner" is meaningless, and so I believe it's fine to allow setting the bundle size via `PipelineOptions` (although default should remain the default partitioner). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150282) Time Spent: 3h 50m (was: 3h 40m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150051=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150051 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 01/Oct/18 14:36 Start Date: 01/Oct/18 14:36 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-425931207 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150051) Time Spent: 3h 40m (was: 3.5h) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 3h 40m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=148910=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148910 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 27/Sep/18 21:21 Start Date: 27/Sep/18 21:21 Worklog Time Spent: 10m Work Description: kyle-winkelman edited a comment on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-425201311 Looking further into the StreamingTransformTranslator, I would like to pose a question. Why do we do the groupByKey followed by the updateStateByKey? It appears to be a giant waste in which we convert everything to bytes and back unnecessarily. The only thing it does is gather all the values for a key into an Iterable, but the updateStateByKey would also do that if it were given the chance. If we were to update the UpdateStateByKeyFunction to expect WindowedValue's instead of Iterable>'s I believe we could eliminate the call to groupByKey. What is happening now is the updateStateByKey will wrap those values in a Seq and so currently we have either an empty Seq or a Seq with exactly 1 item and that item is itself an Iterable that contains multiple items. **UPDATE: I have created a separate jira to look into this. [BEAM-5519](https://jira.apache.org/jira/browse/BEAM-5519)** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 148910) Time Spent: 3.5h (was: 3h 20m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 3.5h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=148849=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148849 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 27/Sep/18 18:44 Start Date: 27/Sep/18 18:44 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-425201311 Looking further into the StreamingTransformTranslator, I would like to pose a question. Why do we do the groupByKey followed by the updateStateByKey? It appears to be a giant waste in which we convert everything to bytes and back unnecessarily. The only thing it does is gather all the values for a key into an Iterable, but the updateStateByKey would also do that if it were given the chance. If we were to update the UpdateStateByKeyFunction to expect WindowedValue's instead of Iterable>'s I believe we could eliminate the call to groupByKey. What is happening now is the updateStateByKey will wrap those values in a Seq and so currently we have either an empty Seq or a Seq with exactly 1 item and that item is itself an Iterable that contains multiple items. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 148849) Time Spent: 3h 20m (was: 3h 10m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 3h 20m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=148801=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148801 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 27/Sep/18 16:12 Start Date: 27/Sep/18 16:12 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-425152159 I don't know how to proceed. I am convinced that in batch mode my proposal is the correct way to proceed. Another example of a silly interaction that occurs do to using defaultParallelism in SourceRDD is reading 2 different files. If one of the two files is a couple of orders of magnitude larger you will need to allocate enough resources to the job to read the larger file, lets say n cores, then the smaller file will get split into n pieces which will result in the smaller file being broken up into many very small bundles. The issue is I do not understand the repercussions this change will have on the streaming mode. Maybe we will need to have two different approaches to the groupBy logic, one for each mode. I am ok with this being experimental and only working if you supply the --bundleSize to the pipeline options. I would like an answer to the last question I asked to understand if in batch mode I can always use the new experimental groupByKeyOnlyDefaultPartitioner because I believe it will not cause a double shuffle in batch mode. Other than that I believe I need a code review and make sure everyone agrees with the approach. If this is not agreed upon I would hope someone could give me some advice on how to get the SparkRunner to work with dynamicAllocation. (Starting with 2 cores and spinning up more if the files are large and are split into more bundles.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 148801) Time Spent: 3h 10m (was: 3h) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 3h 10m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=148556=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148556 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 27/Sep/18 01:46 Start Date: 27/Sep/18 01:46 Worklog Time Spent: 10m Work Description: aaltay commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-424928440 What is the status of this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 148556) Time Spent: 3h (was: 2h 50m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 3h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144402 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 14/Sep/18 18:43 Start Date: 14/Sep/18 18:43 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-421449558 After looking further into [BEAM-1815](https://jira.apache.org/jira/browse/BEAM-1815) my understanding is that the double shuffle happened in streaming mode only because there is the groupByKey in GroupCombineFunctions.groupByKeyOnly followed by updateStateByKey in SparkGroupAlsoByWindowViaWindowSet.groupAlsoByWindow. This will not happen in batch mode so I believe we should always use groupByKey() without the hashpartitioner when in batch mode. That way we don't squash a large amount of data into a few partitions. If you agree I will change (in TransformTranslator): ```java JavaRDD groupedByKey; if (context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize() > 0) { groupedByKey = GroupCombineFunctions.groupByKeyOnlyDefaultPartitioner(inRDD, keyCoder, wvCoder); } else { groupedByKey = GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder); } ``` to: ```java JavaRDD groupedByKey = GroupCombineFunctions.groupByKeyOnlyDefaultPartitioner(inRDD, keyCoder, wvCoder); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144402) Time Spent: 2h 50m (was: 2h 40m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 2h 50m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144397=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144397 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 14/Sep/18 18:36 Start Date: 14/Sep/18 18:36 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-421447794 In my use case we use spark.dynamicAllocation as a way to remove a knob (--num-executors) in our attempt to become knobless; when running in batch mode it will create the SourceRDDs and based on the number of partitions it will try to spin up that many executors. This completely backfires when the SourceRDD is partitioned based on defaultParallelism because that will now be equal to 2 (default --num-executors). If you prefer we could prevent the bundleSize from being a knob and always use 64MB (Apache Hadoop default block size). I understand why streaming acts in this way, but for batch the users are going to have to guess how many executors they need. If they do not guess high enough it is entirely possibly to end up with >2GB of data in a partition (https://issues.apache.org/jira/browse/SPARK-6235). Starting at 64MB per partition does not eliminate this possibility but it does reduce the chances. For example if a user read a 10GB file with 1 executor it would fail if it ever tried to cache the partition, but by breaking it into 64MB partitions it has a chance of succeeding (depending on executor memory, etc.). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144397) Time Spent: 2h 40m (was: 2.5h) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 2h 40m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144336=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144336 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 14/Sep/18 16:34 Start Date: 14/Sep/18 16:34 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-421414299 I don't believe the Dataflow worker code is very useful for dynamic work rebalancing. Good dynamic work rebalancing will need support/signals from each runner. I believe there is a way to build simple dynamic work rebalancing system that would work for all bounded splits by performing a limited amount of graph rewriting at pipeline submission time and then periodic splitting while running sources. You need support for a self loop within the runner to be able to get support for unbounded soruces. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144336) Time Spent: 2.5h (was: 2h 20m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 2.5h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144329=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144329 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 14/Sep/18 16:15 Start Date: 14/Sep/18 16:15 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-421408768 I believe source/worker plumbing for dynamic work rebalancing will be shared which might be helpful in implementing dynamic work rebalancing. Rest of the code is in Dataflow service. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144329) Time Spent: 2h 20m (was: 2h 10m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 2h 20m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144308=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144308 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 14/Sep/18 14:53 Start Date: 14/Sep/18 14:53 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-421384240 @chamikaramj some of the DWR stuff will be shared as part of the dataflow worker donation ? Otherwise I don't see an easy way to support this (only time we seriously thought about this with Amit we concluded that it could require changes in the Spark code). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144308) Time Spent: 2h 10m (was: 2h) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 2h 10m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144297=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144297 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 14/Sep/18 14:26 Start Date: 14/Sep/18 14:26 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-421375360 Sorry missed this PR. Had a quick look. I think proper solution is to introduce dynamic work rebalancing [1] to SparkRunner at some point. This way large bundles can be broken up into smaller bundles if there are more workers to process work. I agree with Ismaël that proposed solution go against the Beam's no-knobs philosophy but I understand why it might be needed till SparkRunner has support for dynamic work rebalancing. I'd suggest performing some experimentation to make sure that the new option helps before introducing it. [1] https://cloud.google.com/blog/products/gcp/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144297) Time Spent: 2h (was: 1h 50m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 2h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144271=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144271 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 14/Sep/18 12:51 Start Date: 14/Sep/18 12:51 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-421348635 Hi, I have not forgotten about this one (sorry for the delay), The default parallelism is calculated to use the ‘optimal’ number of cores and I think it is a reasonable default (it maximizes core utilization in particular for streaming). I prefer not to change this until we have a better way to replace the default value (if you have any suggestion on how to do this with the new approach, it is welcome). I want to include your changes but not as the default for the moment, but let’s say an ‘alternative’ only applied if the user sets the bundle size (we have to doc the partitioner change and mark this method @Experimental). This way we can evaluate if it double shuffles happens or not, and eventually if the performance advantages justify making this behavior the default. WDYT ? Beam design philosophy has always being to reduce ‘knobs’ to its minimum, but I understand that with Spark this might be sometimes needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144271) Time Spent: 1h 50m (was: 1h 40m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 1h 50m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142173=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142173 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 07/Sep/18 13:01 Start Date: 07/Sep/18 13:01 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r215950437 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ## @@ -52,13 +50,11 @@ .map(WindowingHelpers.unwindowFunction()) .mapToPair(TranslationUtils.toPairFunction()) .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)); -// use a default parallelism HashPartitioner. -Partitioner partitioner = new HashPartitioner(rdd.rdd().sparkContext().defaultParallelism()); // using mapPartitions allows to preserve the partitioner // and avoid unnecessary shuffle downstream. return pairRDD -.groupByKey(partitioner) +.groupByKey() Review comment: If I am incorrect, how would we know whether to use the defaultParallelism or some other value? I don't think it would be appropriate to force a SourceRDD, that may have had hundreds of partitions, into the defaultParallelism number of partitions, which may be quite small, as this may result in too much data being in each partition. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 142173) Time Spent: 1h 40m (was: 1.5h) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 1h 40m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142171=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142171 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 07/Sep/18 12:50 Start Date: 07/Sep/18 12:50 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r215947597 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ## @@ -52,13 +50,11 @@ .map(WindowingHelpers.unwindowFunction()) .mapToPair(TranslationUtils.toPairFunction()) .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)); -// use a default parallelism HashPartitioner. -Partitioner partitioner = new HashPartitioner(rdd.rdd().sparkContext().defaultParallelism()); // using mapPartitions allows to preserve the partitioner // and avoid unnecessary shuffle downstream. return pairRDD -.groupByKey(partitioner) +.groupByKey() Review comment: To me it appears the double shuffling was solved by changing map and mapToPair to mapPartitions and mapPartitionsToPair, respectively. I don't think the explicit declaration of the HashPartitioner has any effect. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 142171) Time Spent: 1.5h (was: 1h 20m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 1.5h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142165=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142165 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 07/Sep/18 12:36 Start Date: 07/Sep/18 12:36 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r215943999 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java ## @@ -80,35 +81,16 @@ public Bounded( super(sc, NIL, JavaSparkContext$.MODULE$.fakeClassTag()); this.source = source; this.options = options; - // the input parallelism is determined by Spark's scheduler backend. - // when running on YARN/SparkDeploy it's the result of max(totalCores, 2). - // when running on Mesos it's 8. - // when running local it's the total number of cores (local = 1, local[N] = N, - // local[*] = estimation of the machine's cores). - // ** the configuration "spark.default.parallelism" takes precedence over all of the above ** - this.numPartitions = sc.defaultParallelism(); - checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); + this.bundleSize = options.get().as(SparkPipelineOptions.class).getBundleSize(); + checkArgument(this.bundleSize > 0, "Bundle size must be greater than zero."); this.stepName = stepName; this.metricsAccum = MetricsAccumulator.getInstance(); } -private static final long DEFAULT_BUNDLE_SIZE = 64L * 1024L * 1024L; - @Override public Partition[] getPartitions() { - long desiredSizeBytes = DEFAULT_BUNDLE_SIZE; - try { -desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / numPartitions; - } catch (Exception e) { -LOG.warn( -"Failed to get estimated bundle size for source {}, using default bundle " -+ "size of {} bytes.", -source, -DEFAULT_BUNDLE_SIZE); - } try { -List> partitionedSources = -source.split(desiredSizeBytes, options.get()); +List> partitionedSources = source.split(bundleSize, options.get()); Review comment: The knob based approach falls more in line with what Spark users would expect. If they write native spark with an HDFS input like `sc.textFile("hdfs:///input/war-and-peace.txt")`, they wouldn't expect the file to be broken up differently depending on the number of cores they run with. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 142165) Time Spent: 1h 20m (was: 1h 10m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 1h 20m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142121=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142121 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 07/Sep/18 10:14 Start Date: 07/Sep/18 10:14 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r215912675 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ## @@ -52,13 +50,11 @@ .map(WindowingHelpers.unwindowFunction()) .mapToPair(TranslationUtils.toPairFunction()) .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)); -// use a default parallelism HashPartitioner. -Partitioner partitioner = new HashPartitioner(rdd.rdd().sparkContext().defaultParallelism()); // using mapPartitions allows to preserve the partitioner // and avoid unnecessary shuffle downstream. return pairRDD -.groupByKey(partitioner) +.groupByKey() Review comment: There is a previous issue on double shuffling that was solved by explicitly passing the partitioner [BEAM-1815](https://jira.apache.org/jira/browse/BEAM-1815). I am afraid this change has the side effect of bringing this issue back. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 142121) Time Spent: 1h 10m (was: 1h) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 1h 10m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142120=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142120 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 07/Sep/18 10:14 Start Date: 07/Sep/18 10:14 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r215893873 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java ## @@ -80,35 +81,16 @@ public Bounded( super(sc, NIL, JavaSparkContext$.MODULE$.fakeClassTag()); this.source = source; this.options = options; - // the input parallelism is determined by Spark's scheduler backend. - // when running on YARN/SparkDeploy it's the result of max(totalCores, 2). - // when running on Mesos it's 8. - // when running local it's the total number of cores (local = 1, local[N] = N, - // local[*] = estimation of the machine's cores). - // ** the configuration "spark.default.parallelism" takes precedence over all of the above ** - this.numPartitions = sc.defaultParallelism(); - checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); + this.bundleSize = options.get().as(SparkPipelineOptions.class).getBundleSize(); + checkArgument(this.bundleSize > 0, "Bundle size must be greater than zero."); this.stepName = stepName; this.metricsAccum = MetricsAccumulator.getInstance(); } -private static final long DEFAULT_BUNDLE_SIZE = 64L * 1024L * 1024L; - @Override public Partition[] getPartitions() { - long desiredSizeBytes = DEFAULT_BUNDLE_SIZE; - try { -desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / numPartitions; - } catch (Exception e) { -LOG.warn( -"Failed to get estimated bundle size for source {}, using default bundle " -+ "size of {} bytes.", -source, -DEFAULT_BUNDLE_SIZE); - } try { -List> partitionedSources = -source.split(desiredSizeBytes, options.get()); +List> partitionedSources = source.split(bundleSize, options.get()); Review comment: What is the argument for changing the default approach for a more knob based one ? Maybe is better to preserve the default and only overwrite it if this is parametrized. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 142120) Time Spent: 1h (was: 50m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 1h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=141831=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141831 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 06/Sep/18 16:16 Start Date: 06/Sep/18 16:16 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-419153980 ping @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 141831) Time Spent: 50m (was: 40m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 50m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=138431=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138431 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 27/Aug/18 15:17 Start Date: 27/Aug/18 15:17 Worklog Time Spent: 10m Work Description: kyle-winkelman commented on a change in pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#discussion_r212977881 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ## @@ -52,13 +50,11 @@ .map(WindowingHelpers.unwindowFunction()) .mapToPair(TranslationUtils.toPairFunction()) .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)); -// use a default parallelism HashPartitioner. -Partitioner partitioner = new HashPartitioner(rdd.rdd().sparkContext().defaultParallelism()); // using mapPartitions allows to preserve the partitioner // and avoid unnecessary shuffle downstream. return pairRDD -.groupByKey(partitioner) +.groupByKey() Review comment: This will cause the [Partitioner.defaultPartitioner](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L62) to be used. When called on a SourceRDD this should be a HashPartitioner with the number of partitions equal to the number of splits created by the bundleSize. When called on a SourceDStream this should be a HashPartitioner with the number of partitions equal to the defaultParallelism. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 138431) Time Spent: 40m (was: 0.5h) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 40m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=137965=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137965 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 24/Aug/18 20:43 Start Date: 24/Aug/18 20:43 Worklog Time Spent: 10m Work Description: apilloud commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-415877315 Java PreCommits are failing due to unused imports. Can you run `./gradlew :beam-runners-spark:spotlessApply`? ping @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137965) Time Spent: 0.5h (was: 20m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 0.5h > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=136159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136159 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 20/Aug/18 16:12 Start Date: 20/Aug/18 16:12 Worklog Time Spent: 10m Work Description: pabloem commented on issue #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181#issuecomment-414373896 I've asked @chamikaramj to take a look, and he said he would. Just know this PR is being followed-up on. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 136159) Time Spent: 20m (was: 10m) > Spark SourceRDD Not Designed With Dynamic Allocation In Mind > > > Key: BEAM-4783 > URL: https://issues.apache.org/jira/browse/BEAM-4783 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Affects Versions: 2.5.0 >Reporter: Kyle Winkelman >Assignee: Jean-Baptiste Onofré >Priority: Major > Labels: newbie > Time Spent: 20m > Remaining Estimate: 0h > > When the spark-runner is used along with the configuration > spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It > then falls back to the value calculated in this description: > // when running on YARN/SparkDeploy it's the result of max(totalCores, > 2). > // when running on Mesos it's 8. > // when running local it's the total number of cores (local = 1, > local[N] = N, > // local[*] = estimation of the machine's cores). > // ** the configuration "spark.default.parallelism" takes precedence > over all of the above ** > So in most cases this default is quite small. This is an issue when using a > very large input file as it will only get split in half. > I believe that when Dynamic Allocation is enable the SourceRDD should use the > DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows > you to change this DEFAULT_BUNDLE_SIZE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind
[ https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=132486=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132486 ] ASF GitHub Bot logged work on BEAM-4783: Author: ASF GitHub Bot Created on: 08/Aug/18 17:24 Start Date: 08/Aug/18 17:24 Worklog Time Spent: 10m Work Description: kyle-winkelman opened a new pull request #6181: [BEAM-4783] Add bundleSize for splitting BoundedSources. URL: https://github.com/apache/beam/pull/6181 When running a workflow with the Spark Runner with BoundedSources, files are only split on the defaultParallelism. When accessing HDFS in native Spark it defaults to partitioning based on the HDFS block size (my understanding). I believe that there should be a configurable bundleSize to split BoundedSources so that a larger file corresponds to more splits rather than a large file corresponding to larger splits as happens now. Spark Runner Code Owner: @jbonofre Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 132486) Time Spent: 10m Remaining Estimate: 0h > Spark SourceRDD Not Designed With Dynamic Allocation In Mind >