[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=151550=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151550
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 05/Oct/18 09:01
Start Date: 05/Oct/18 09:01
Worklog Time Spent: 10m 
  Work Description: iemejia closed pull request #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 7a7f9902db2..651e5223721 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark;
 
 import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -99,6 +100,16 @@ public String create(PipelineOptions options) {
 
   void setCheckpointDurationMillis(Long durationMillis);
 
+  @Description(
+  "If set bundleSize will be used for splitting BoundedSources, otherwise 
default to "
+  + "splitting BoundedSources on Spark defaultParallelism. Most 
effective when used with "
+  + "Spark dynamicAllocation.")
+  @Default.Long(0)
+  Long getBundleSize();
+
+  @Experimental
+  void setBundleSize(Long value);
+
   @Description("Enable/disable sending aggregator values to Spark's metric 
sinks")
   @Default.Boolean(true)
   Boolean getEnableSparkMetricSinks();
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 67d0ab25e72..c94c4ec2a9e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -30,6 +30,7 @@
 import java.util.NoSuchElementException;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
@@ -65,6 +66,7 @@
 private final BoundedSource source;
 private final SerializablePipelineOptions options;
 private final int numPartitions;
+private final long bundleSize;
 private final String stepName;
 private final Accumulator metricsAccum;
 
@@ -88,6 +90,7 @@ public Bounded(
   // ** the configuration "spark.default.parallelism" takes precedence 
over all of the above **
   this.numPartitions = sc.defaultParallelism();
   checkArgument(this.numPartitions > 0, "Number of partitions must be 
greater than zero.");
+  this.bundleSize = 
options.get().as(SparkPipelineOptions.class).getBundleSize();
   this.stepName = stepName;
   this.metricsAccum = MetricsAccumulator.getInstance();
 }
@@ -96,19 +99,23 @@ public Bounded(
 
 @Override
 public Partition[] getPartitions() {
-  long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
   try {
-desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / 
numPartitions;
-  } catch (Exception e) {
-LOG.warn(
-"Failed to get estimated bundle size for source {}, using default 
bundle "
-+ "size of {} bytes.",
-source,
-DEFAULT_BUNDLE_SIZE);
-  }
-  try {
-List> partitionedSources =
-source.split(desiredSizeBytes, options.get());
+List> partitionedSources;
+if (bundleSize > 0) {
+  partitionedSources = source.split(bundleSize, options.get());
+} else {
+  long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
+  try {
+desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / 
numPartitions;
+  } catch (Exception e) {
+LOG.warn(
+"Failed to get estimated bundle size for source {}, using 
default bundle "
++ "size of {} bytes.",
+source,
+DEFAULT_BUNDLE_SIZE);
+  }
+  partitionedSources = source.split(desiredSizeBytes, options.get());
+  

[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=151527=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151527
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 05/Oct/18 07:58
Start Date: 05/Oct/18 07:58
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-427279202
 
 
   Run Spark ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 151527)
Time Spent: 4.5h  (was: 4h 20m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
> Fix For: 2.8.0
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150800=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150800
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 03/Oct/18 16:06
Start Date: 03/Oct/18 16:06
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #6181: 
[BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r221656872
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 ##
 @@ -99,6 +99,12 @@ public String create(PipelineOptions options) {
 
   void setCheckpointDurationMillis(Long durationMillis);
 
+  @Description("Bundle Size for splitting BoundedSources.")
+  @Default.Long(0)
+  long getBundleSize();
 
 Review comment:
   Can you make the get/set type not a primitive for consistency with the other 
options.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150800)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150797=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150797
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 03/Oct/18 16:06
Start Date: 03/Oct/18 16:06
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #6181: 
[BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r221661790
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 ##
 @@ -68,6 +69,32 @@
 
TranslationUtils.functionToFlatMapFunction(WindowingHelpers.windowFunction()), 
true);
   }
 
+  /**
+   * An implementation of {@link
+   * org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} 
for the Spark runner.
+   * Used only when bundleSize is set in SparkPipelineOptions. Evaluating if 
the default Partitioner
+   * causes a reshuffle of the data.
+   */
+  @Experimental
 
 Review comment:
   Can you please remove this annotation from here and put it in the 
`SparkPipelineOptions.setBundleSize`. Internal implementation of runners do not 
have so far a stable API so we have no issues here, it is the options that the 
users can depend on and what I wanted to highlight is that this knob can 
disappear if we find a better compromise or if we can make a good default 
without it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150797)
Time Spent: 4h 10m  (was: 4h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150798=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150798
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 03/Oct/18 16:06
Start Date: 03/Oct/18 16:06
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #6181: 
[BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r221672729
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 ##
 @@ -68,6 +69,32 @@
 
TranslationUtils.functionToFlatMapFunction(WindowingHelpers.windowFunction()), 
true);
   }
 
+  /**
+   * An implementation of {@link
+   * org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} 
for the Spark runner.
+   * Used only when bundleSize is set in SparkPipelineOptions. Evaluating if 
the default Partitioner
+   * causes a reshuffle of the data.
+   */
+  @Experimental
+  public static 
 
 Review comment:
   Is the only difference with the other is the partitioner maybe we can pass 
it better as a parameter (to not have repeated code).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150798)
Time Spent: 4h 20m  (was: 4h 10m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150799=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150799
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 03/Oct/18 16:06
Start Date: 03/Oct/18 16:06
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #6181: 
[BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r221660123
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 ##
 @@ -99,6 +99,12 @@ public String create(PipelineOptions options) {
 
   void setCheckpointDurationMillis(Long durationMillis);
 
+  @Description("Bundle Size for splitting BoundedSources.")
 
 Review comment:
   probably worth documenting the behavior (if set  otherwise ...), also 
mention the need for the spark.* variables in the environtment to be set (for 
dynamic allocation).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150799)
Time Spent: 4h 20m  (was: 4h 10m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150284=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150284
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 02/Oct/18 02:01
Start Date: 02/Oct/18 02:01
Worklog Time Spent: 10m 
  Work Description: amitsela commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-426121773
 
 
   As for `BEAM-5519`, I'll reply in the JIRA ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150284)
Time Spent: 4h  (was: 3h 50m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150282=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150282
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 02/Oct/18 02:00
Start Date: 02/Oct/18 02:00
Worklog Time Spent: 10m 
  Work Description: amitsela commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-426121506
 
 
   @iemejia @kyle-winkelman I agree that for batch, the effort to keep the 
"default partitioner" is meaningless, and so I believe it's fine to allow 
setting the bundle size via `PipelineOptions` (although default should remain 
the default partitioner). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150282)
Time Spent: 3h 50m  (was: 3h 40m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=150051=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150051
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 01/Oct/18 14:36
Start Date: 01/Oct/18 14:36
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-425931207
 
 
   Run Spark ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150051)
Time Spent: 3h 40m  (was: 3.5h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=148910=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148910
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 27/Sep/18 21:21
Start Date: 27/Sep/18 21:21
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman edited a comment on issue #6181: 
[BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-425201311
 
 
   Looking further into the StreamingTransformTranslator, I would like to pose 
a question. Why do we do the groupByKey followed by the updateStateByKey? It 
appears to be a giant waste in which we convert everything to bytes and back 
unnecessarily.
   
   The only thing it does is gather all the values for a key into an Iterable, 
but the updateStateByKey would also do that if it were given the chance.
   
   If we were to update the UpdateStateByKeyFunction to expect 
WindowedValue's instead of Iterable>'s I believe we could 
eliminate the call to groupByKey. What is happening now is the updateStateByKey 
will wrap those values in a Seq and so currently we have either an empty Seq or 
a Seq with exactly 1 item and that item is itself an Iterable that contains 
multiple items.
   
   **UPDATE: I have created a separate jira to look into this. 
[BEAM-5519](https://jira.apache.org/jira/browse/BEAM-5519)**


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 148910)
Time Spent: 3.5h  (was: 3h 20m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=148849=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148849
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 27/Sep/18 18:44
Start Date: 27/Sep/18 18:44
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] 
Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-425201311
 
 
   Looking further into the StreamingTransformTranslator, I would like to pose 
a question. Why do we do the groupByKey followed by the updateStateByKey? It 
appears to be a giant waste in which we convert everything to bytes and back 
unnecessarily.
   
   The only thing it does is gather all the values for a key into an Iterable, 
but the updateStateByKey would also do that if it were given the chance.
   
   If we were to update the UpdateStateByKeyFunction to expect 
WindowedValue's instead of Iterable>'s I believe we could 
eliminate the call to groupByKey. What is happening now is the updateStateByKey 
will wrap those values in a Seq and so currently we have either an empty Seq or 
a Seq with exactly 1 item and that item is itself an Iterable that contains 
multiple items.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 148849)
Time Spent: 3h 20m  (was: 3h 10m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=148801=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148801
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 27/Sep/18 16:12
Start Date: 27/Sep/18 16:12
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] 
Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-425152159
 
 
   I don't know how to proceed.
   
   I am convinced that in batch mode my proposal is the correct way to proceed. 
Another example of a silly interaction that occurs do to using 
defaultParallelism in SourceRDD is reading 2 different files. If one of the two 
files is a couple of orders of magnitude larger you will need to allocate 
enough resources to the job to read the larger file, lets say n cores, then the 
smaller file will get split into n pieces which will result in the smaller file 
being broken up into many very small bundles.
   
   The issue is I do not understand the repercussions this change will have on 
the streaming mode. Maybe we will need to have two different approaches to the 
groupBy logic, one for each mode.
   
   I am ok with this being experimental and only working if you supply the 
--bundleSize to the pipeline options. I would like an answer to the last 
question I asked to understand if in batch mode I can always use the new 
experimental groupByKeyOnlyDefaultPartitioner because I believe it will not 
cause a double shuffle in batch mode.
   
   Other than that I believe I need a code review and make sure everyone agrees 
with the approach.
   
   If this is not agreed upon I would hope someone could give me some advice on 
how to get the SparkRunner to work with dynamicAllocation. (Starting with 2 
cores and spinning up more if the files are large and are split into more 
bundles.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 148801)
Time Spent: 3h 10m  (was: 3h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=148556=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148556
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 27/Sep/18 01:46
Start Date: 27/Sep/18 01:46
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-424928440
 
 
   What is the status of this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 148556)
Time Spent: 3h  (was: 2h 50m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144402
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 14/Sep/18 18:43
Start Date: 14/Sep/18 18:43
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] 
Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-421449558
 
 
   After looking further into 
[BEAM-1815](https://jira.apache.org/jira/browse/BEAM-1815) my understanding is 
that the double shuffle happened in streaming mode only because there is the 
groupByKey in GroupCombineFunctions.groupByKeyOnly followed by updateStateByKey 
in SparkGroupAlsoByWindowViaWindowSet.groupAlsoByWindow. This will not happen 
in batch mode so I believe we should always use groupByKey() without the 
hashpartitioner when in batch mode. That way we don't squash a large amount of 
data into a few partitions.
   
   If you agree I will change (in TransformTranslator):
   ```java
   JavaRDD 
groupedByKey;
   if 
(context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize()
   > 0) {
 groupedByKey =
 GroupCombineFunctions.groupByKeyOnlyDefaultPartitioner(inRDD, 
keyCoder, wvCoder);
   } else {
 groupedByKey = GroupCombineFunctions.groupByKeyOnly(inRDD, 
keyCoder, wvCoder);
   }
   ```
   to:
   ```java
   JavaRDD 
groupedByKey =
 GroupCombineFunctions.groupByKeyOnlyDefaultPartitioner(inRDD, 
keyCoder, wvCoder);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144402)
Time Spent: 2h 50m  (was: 2h 40m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144397=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144397
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 14/Sep/18 18:36
Start Date: 14/Sep/18 18:36
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] 
Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-421447794
 
 
   In my use case we use spark.dynamicAllocation as a way to remove a knob 
(--num-executors) in our attempt to become knobless; when running in batch mode 
it will create the SourceRDDs and based on the number of partitions it will try 
to spin up that many executors. This completely backfires when the SourceRDD is 
partitioned based on defaultParallelism because that will now be equal to 2 
(default --num-executors).
   
   If you prefer we could prevent the bundleSize from being a knob and always 
use 64MB (Apache Hadoop default block size).
   
   I understand why streaming acts in this way, but for batch the users are 
going to have to guess how many executors they need. If they do not guess high 
enough it is entirely possibly to end up with >2GB of data in a partition 
(https://issues.apache.org/jira/browse/SPARK-6235). Starting at 64MB per 
partition does not eliminate this possibility but it does reduce the chances. 
For example if a user read a 10GB file with 1 executor it would fail if it ever 
tried to cache the partition, but by breaking it into 64MB partitions it has a 
chance of succeeding (depending on executor memory, etc.).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144397)
Time Spent: 2h 40m  (was: 2.5h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144336=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144336
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 14/Sep/18 16:34
Start Date: 14/Sep/18 16:34
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-421414299
 
 
   I don't believe the Dataflow worker code is very useful for dynamic work 
rebalancing. Good dynamic work rebalancing will need support/signals from each 
runner. I believe there is a way to build simple dynamic work rebalancing 
system that would work for all bounded splits by performing a limited amount of 
graph rewriting at pipeline submission time and then periodic splitting while 
running sources. You need support for a self loop within the runner to be able 
to get support for unbounded soruces.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144336)
Time Spent: 2.5h  (was: 2h 20m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144329=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144329
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 14/Sep/18 16:15
Start Date: 14/Sep/18 16:15
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-421408768
 
 
   I believe source/worker plumbing for dynamic work rebalancing will be shared 
which might be helpful in implementing dynamic work rebalancing. Rest of the 
code is in Dataflow service.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144329)
Time Spent: 2h 20m  (was: 2h 10m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144308=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144308
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 14/Sep/18 14:53
Start Date: 14/Sep/18 14:53
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-421384240
 
 
   @chamikaramj some of the DWR stuff will be shared as part of the dataflow 
worker donation ? Otherwise I don't see an easy way to support this (only time 
we seriously thought about this with Amit we concluded that it could require 
changes in the Spark code).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144308)
Time Spent: 2h 10m  (was: 2h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144297=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144297
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 14/Sep/18 14:26
Start Date: 14/Sep/18 14:26
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-421375360
 
 
   Sorry missed this PR.
   
   Had a quick look.
   
   I think proper solution is to introduce dynamic work rebalancing [1] to 
SparkRunner at some point. This way large bundles can be broken up into smaller 
bundles if there are more workers to process work. I agree with  Ismaël that 
proposed solution go against the Beam's no-knobs philosophy but I understand 
why it might be needed till SparkRunner has support for dynamic work 
rebalancing. I'd suggest performing some experimentation to make sure that the 
new option helps before introducing it.
   
   [1] 
https://cloud.google.com/blog/products/gcp/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144297)
Time Spent: 2h  (was: 1h 50m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=144271=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144271
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 14/Sep/18 12:51
Start Date: 14/Sep/18 12:51
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-421348635
 
 
   Hi, I have not forgotten about this one (sorry for the delay),  The default 
parallelism is calculated to use the ‘optimal’ number of cores and I think it 
is a reasonable default (it maximizes core utilization in particular for 
streaming). I prefer not to change this until we have a better way to replace 
the default value (if you have any suggestion on how to do this with the new 
approach, it is welcome).
   
   I want to include your changes but not as the default for the moment, but 
let’s say an ‘alternative’ only applied if the user sets the bundle size (we 
have to doc the partitioner change and mark this method @Experimental). This 
way we can evaluate if it double shuffles happens or not, and eventually if the 
performance advantages justify making this behavior the default. WDYT ?
   
   Beam design philosophy has always being to reduce ‘knobs’ to its minimum, 
but I understand that with Spark this might be sometimes needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144271)
Time Spent: 1h 50m  (was: 1h 40m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142173=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142173
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 07/Sep/18 13:01
Start Date: 07/Sep/18 13:01
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on a change in pull request 
#6181: [BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r215950437
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 ##
 @@ -52,13 +50,11 @@
 .map(WindowingHelpers.unwindowFunction())
 .mapToPair(TranslationUtils.toPairFunction())
 .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
-// use a default parallelism HashPartitioner.
-Partitioner partitioner = new 
HashPartitioner(rdd.rdd().sparkContext().defaultParallelism());
 
 // using mapPartitions allows to preserve the partitioner
 // and avoid unnecessary shuffle downstream.
 return pairRDD
-.groupByKey(partitioner)
+.groupByKey()
 
 Review comment:
   If I am incorrect, how would we know whether to use the defaultParallelism 
or some other value? I don't think it would be appropriate to force a 
SourceRDD, that may have had hundreds of partitions, into the 
defaultParallelism number of partitions, which may be quite small, as this may 
result in too much data being in each partition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 142173)
Time Spent: 1h 40m  (was: 1.5h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142171=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142171
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 07/Sep/18 12:50
Start Date: 07/Sep/18 12:50
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on a change in pull request 
#6181: [BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r215947597
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 ##
 @@ -52,13 +50,11 @@
 .map(WindowingHelpers.unwindowFunction())
 .mapToPair(TranslationUtils.toPairFunction())
 .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
-// use a default parallelism HashPartitioner.
-Partitioner partitioner = new 
HashPartitioner(rdd.rdd().sparkContext().defaultParallelism());
 
 // using mapPartitions allows to preserve the partitioner
 // and avoid unnecessary shuffle downstream.
 return pairRDD
-.groupByKey(partitioner)
+.groupByKey()
 
 Review comment:
   To me it appears the double shuffling was solved by changing map and 
mapToPair to mapPartitions and mapPartitionsToPair, respectively. I don't think 
the explicit declaration of the HashPartitioner has any effect.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 142171)
Time Spent: 1.5h  (was: 1h 20m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142165=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142165
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 07/Sep/18 12:36
Start Date: 07/Sep/18 12:36
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on a change in pull request 
#6181: [BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r215943999
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
 ##
 @@ -80,35 +81,16 @@ public Bounded(
   super(sc, NIL, JavaSparkContext$.MODULE$.fakeClassTag());
   this.source = source;
   this.options = options;
-  // the input parallelism is determined by Spark's scheduler backend.
-  // when running on YARN/SparkDeploy it's the result of max(totalCores, 
2).
-  // when running on Mesos it's 8.
-  // when running local it's the total number of cores (local = 1, 
local[N] = N,
-  // local[*] = estimation of the machine's cores).
-  // ** the configuration "spark.default.parallelism" takes precedence 
over all of the above **
-  this.numPartitions = sc.defaultParallelism();
-  checkArgument(this.numPartitions > 0, "Number of partitions must be 
greater than zero.");
+  this.bundleSize = 
options.get().as(SparkPipelineOptions.class).getBundleSize();
+  checkArgument(this.bundleSize > 0, "Bundle size must be greater than 
zero.");
   this.stepName = stepName;
   this.metricsAccum = MetricsAccumulator.getInstance();
 }
 
-private static final long DEFAULT_BUNDLE_SIZE = 64L * 1024L * 1024L;
-
 @Override
 public Partition[] getPartitions() {
-  long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
-  try {
-desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / 
numPartitions;
-  } catch (Exception e) {
-LOG.warn(
-"Failed to get estimated bundle size for source {}, using default 
bundle "
-+ "size of {} bytes.",
-source,
-DEFAULT_BUNDLE_SIZE);
-  }
   try {
-List> partitionedSources =
-source.split(desiredSizeBytes, options.get());
+List> partitionedSources = 
source.split(bundleSize, options.get());
 
 Review comment:
   The knob based approach falls more in line with what Spark users would 
expect. If they write native spark with an HDFS input like 
`sc.textFile("hdfs:///input/war-and-peace.txt")`, they wouldn't expect the file 
to be broken up differently depending on the number of cores they run with.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 142165)
Time Spent: 1h 20m  (was: 1h 10m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142121=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142121
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 07/Sep/18 10:14
Start Date: 07/Sep/18 10:14
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #6181: 
[BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r215912675
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 ##
 @@ -52,13 +50,11 @@
 .map(WindowingHelpers.unwindowFunction())
 .mapToPair(TranslationUtils.toPairFunction())
 .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
-// use a default parallelism HashPartitioner.
-Partitioner partitioner = new 
HashPartitioner(rdd.rdd().sparkContext().defaultParallelism());
 
 // using mapPartitions allows to preserve the partitioner
 // and avoid unnecessary shuffle downstream.
 return pairRDD
-.groupByKey(partitioner)
+.groupByKey()
 
 Review comment:
   There is a previous issue on double shuffling that was solved by explicitly 
passing the partitioner 
[BEAM-1815](https://jira.apache.org/jira/browse/BEAM-1815). I am afraid this 
change has the side effect of bringing this issue back.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 142121)
Time Spent: 1h 10m  (was: 1h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=142120=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142120
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 07/Sep/18 10:14
Start Date: 07/Sep/18 10:14
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #6181: 
[BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r215893873
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
 ##
 @@ -80,35 +81,16 @@ public Bounded(
   super(sc, NIL, JavaSparkContext$.MODULE$.fakeClassTag());
   this.source = source;
   this.options = options;
-  // the input parallelism is determined by Spark's scheduler backend.
-  // when running on YARN/SparkDeploy it's the result of max(totalCores, 
2).
-  // when running on Mesos it's 8.
-  // when running local it's the total number of cores (local = 1, 
local[N] = N,
-  // local[*] = estimation of the machine's cores).
-  // ** the configuration "spark.default.parallelism" takes precedence 
over all of the above **
-  this.numPartitions = sc.defaultParallelism();
-  checkArgument(this.numPartitions > 0, "Number of partitions must be 
greater than zero.");
+  this.bundleSize = 
options.get().as(SparkPipelineOptions.class).getBundleSize();
+  checkArgument(this.bundleSize > 0, "Bundle size must be greater than 
zero.");
   this.stepName = stepName;
   this.metricsAccum = MetricsAccumulator.getInstance();
 }
 
-private static final long DEFAULT_BUNDLE_SIZE = 64L * 1024L * 1024L;
-
 @Override
 public Partition[] getPartitions() {
-  long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
-  try {
-desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / 
numPartitions;
-  } catch (Exception e) {
-LOG.warn(
-"Failed to get estimated bundle size for source {}, using default 
bundle "
-+ "size of {} bytes.",
-source,
-DEFAULT_BUNDLE_SIZE);
-  }
   try {
-List> partitionedSources =
-source.split(desiredSizeBytes, options.get());
+List> partitionedSources = 
source.split(bundleSize, options.get());
 
 Review comment:
   What is the argument for changing the default approach for a more knob based 
one ? Maybe is better to preserve the default and only overwrite it if this is 
parametrized.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 142120)
Time Spent: 1h  (was: 50m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-09-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=141831=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141831
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 06/Sep/18 16:16
Start Date: 06/Sep/18 16:16
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on issue #6181: [BEAM-4783] 
Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-419153980
 
 
   ping @chamikaramj 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141831)
Time Spent: 50m  (was: 40m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-08-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=138431=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138431
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 27/Aug/18 15:17
Start Date: 27/Aug/18 15:17
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman commented on a change in pull request 
#6181: [BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#discussion_r212977881
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
 ##
 @@ -52,13 +50,11 @@
 .map(WindowingHelpers.unwindowFunction())
 .mapToPair(TranslationUtils.toPairFunction())
 .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
-// use a default parallelism HashPartitioner.
-Partitioner partitioner = new 
HashPartitioner(rdd.rdd().sparkContext().defaultParallelism());
 
 // using mapPartitions allows to preserve the partitioner
 // and avoid unnecessary shuffle downstream.
 return pairRDD
-.groupByKey(partitioner)
+.groupByKey()
 
 Review comment:
   This will cause the 
[Partitioner.defaultPartitioner](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L62)
 to be used. When called on a SourceRDD this should be a HashPartitioner with 
the number of partitions equal to the number of splits created by the 
bundleSize. When called on a SourceDStream this should be a HashPartitioner 
with the number of partitions equal to the defaultParallelism.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 138431)
Time Spent: 40m  (was: 0.5h)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=137965=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137965
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 24/Aug/18 20:43
Start Date: 24/Aug/18 20:43
Worklog Time Spent: 10m 
  Work Description: apilloud commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-415877315
 
 
   Java PreCommits are failing due to unused imports. Can you run `./gradlew 
:beam-runners-spark:spotlessApply`?
   
   ping @chamikaramj 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137965)
Time Spent: 0.5h  (was: 20m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-08-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=136159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136159
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 20/Aug/18 16:12
Start Date: 20/Aug/18 16:12
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #6181: [BEAM-4783] Add 
bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181#issuecomment-414373896
 
 
   I've asked @chamikaramj to take a look, and he said he would. Just know this 
PR is being followed-up on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 136159)
Time Spent: 20m  (was: 10m)

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
> 
>
> Key: BEAM-4783
> URL: https://issues.apache.org/jira/browse/BEAM-4783
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Affects Versions: 2.5.0
>Reporter: Kyle Winkelman
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Labels: newbie
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When the spark-runner is used along with the configuration 
> spark.dynamicAllocation.enabled=true the SourceRDD does not detect this. It 
> then falls back to the value calculated in this description:
>   // when running on YARN/SparkDeploy it's the result of max(totalCores, 
> 2).
>   // when running on Mesos it's 8.
>   // when running local it's the total number of cores (local = 1, 
> local[N] = N,
>   // local[*] = estimation of the machine's cores).
>   // ** the configuration "spark.default.parallelism" takes precedence 
> over all of the above **
> So in most cases this default is quite small. This is an issue when using a 
> very large input file as it will only get split in half.
> I believe that when Dynamic Allocation is enable the SourceRDD should use the 
> DEFAULT_BUNDLE_SIZE and possibly expose a SparkPipelineOptions that allows 
> you to change this DEFAULT_BUNDLE_SIZE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4783) Spark SourceRDD Not Designed With Dynamic Allocation In Mind

2018-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4783?focusedWorklogId=132486=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132486
 ]

ASF GitHub Bot logged work on BEAM-4783:


Author: ASF GitHub Bot
Created on: 08/Aug/18 17:24
Start Date: 08/Aug/18 17:24
Worklog Time Spent: 10m 
  Work Description: kyle-winkelman opened a new pull request #6181: 
[BEAM-4783] Add bundleSize for splitting BoundedSources.
URL: https://github.com/apache/beam/pull/6181
 
 
   When running a workflow with the Spark Runner with BoundedSources, files are 
only split on the defaultParallelism. When accessing HDFS in native Spark it 
defaults to partitioning based on the HDFS block size (my understanding). I 
believe that there should be a configurable bundleSize to split BoundedSources 
so that a larger file corresponds to more splits rather than a large file 
corresponding to larger splits as happens now.
   
   Spark Runner Code Owner: @jbonofre 
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 132486)
Time Spent: 10m
Remaining Estimate: 0h

> Spark SourceRDD Not Designed With Dynamic Allocation In Mind
>