[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2017-01-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836540#comment-15836540
 ] 

Amit Sela commented on BEAM-302:


[~davor] where are we with Scio integration ?

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>Assignee: Neville Li
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2017-01-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836535#comment-15836535
 ] 

Amit Sela commented on BEAM-302:


Scio currently supports the Dataflow SDK (sort of Beam predecessor), and once 
it will support Beam it could interact with any runner supporting the Java SDK 
since Scio is a Scala DSL running on top of the Java SDK. 

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>Assignee: Neville Li
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1294) Long running UnboundedSource Readers via Broadcasts

2017-01-21 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1294:
---

 Summary: Long running UnboundedSource Readers via Broadcasts
 Key: BEAM-1294
 URL: https://issues.apache.org/jira/browse/BEAM-1294
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


When reading from an UnboundedSource, current implementation will cause each 
split to create a new Reader every micro-batch.

As long as the overhead of creating a reader is relatively low, it's reasonable 
(though I'd still be happy to get rid of), but in cases where the creation 
overhead is large it becomes unreasonable forcing large batches.

One way to solve this could be to create a pool of lazy-init readers to serve 
each executor, maybe via Broadcast variables. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (BEAM-1291) KafkaIO: don't log warning in offset fetcher while closing the reader.

2017-01-21 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1291.
-
   Resolution: Fixed
Fix Version/s: 0.5.0

> KafkaIO: don't log warning in offset fetcher while closing the reader.
> --
>
> Key: BEAM-1291
> URL: https://issues.apache.org/jira/browse/BEAM-1291
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Raghu Angadi
>Assignee: Raghu Angadi
>Priority: Minor
> Fix For: 0.5.0
>
>
> When the KafkaIO reader is closed, it wakes up two background threads used 
> for reading from Kafka. The wakeup notification results in an exception. One 
> of the threads logs a warning with full stacktrace even though it is part of 
> normal operation. It should not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (BEAM-1214) fail to run on SparkRunner with VerifyError

2017-01-19 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed BEAM-1214.
---
   Resolution: Fixed
Fix Version/s: 0.5.0

> fail to run on SparkRunner with VerifyError
> ---
>
> Key: BEAM-1214
> URL: https://issues.apache.org/jira/browse/BEAM-1214
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 0.3.0-incubating
>Reporter: Xu Mingmin
>Assignee: Kobi Salant
> Fix For: 0.5.0
>
>
> I see this exception when running with SparkRunner locally. I fix it in my 
> environment by change jackson-module-scala_2.10 to version 2.7.2. 
> 
>   com.fasterxml.jackson.module
>   jackson-module-scala_2.10
>   2.7.2
> 
> Here's the exception stack trace:
> Exception in thread "main" java.lang.VerifyError: class 
> com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides 
> final method 
> withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(Unknown Source)
>   at java.security.SecureClassLoader.defineClass(Unknown Source)
>   at java.net.URLClassLoader.defineClass(Unknown Source)
>   at java.net.URLClassLoader.access$100(Unknown Source)
>   at java.net.URLClassLoader$1.run(Unknown Source)
>   at java.net.URLClassLoader$1.run(Unknown Source)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at 
> com.fasterxml.jackson.module.scala.ser.IteratorSerializerModule$class.$init$(IteratorSerializerModule.scala:70)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule.(DefaultScalaModule.scala:19)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule$.(DefaultScalaModule.scala:35)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule$.(DefaultScalaModule.scala)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala:81)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
>   at 
> org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:161)
>   at 
> org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:46)
>   at 
> org.apache.beam.runners.spark.translation.TransformTranslator$7.evaluate(TransformTranslator.java:262)
>   at 
> org.apache.beam.runners.spark.translation.TransformTranslator$7.evaluate(TransformTranslator.java:248)
>   at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:267)
>   at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:252)
>   at 
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:224)
>   at 
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:101)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:167)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:75)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-648) Persist and restore Aggergator values in case of recovery from failure

2017-01-19 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-648:
---
Assignee: Aviem Zur  (was: Jean-Baptiste Onofré)

> Persist and restore Aggergator values in case of recovery from failure
> --
>
> Key: BEAM-648
> URL: https://issues.apache.org/jira/browse/BEAM-648
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
>
> While Aggregators are fault-tolerant (pending 
> https://github.com/apache/incubator-beam/pull/909), they will recover into 
> their initial value. 
> The SparkRunner should persist the values to durable storage and recover from 
> the persisted state.
> One way to go would be to persist via Spark's listeners, for example 
> "onBatchCompleted".
> For more details see SPARK-5206.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-1214) fail to run on SparkRunner with VerifyError

2017-01-19 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15829944#comment-15829944
 ] 

Amit Sela edited comment on BEAM-1214 at 1/19/17 1:56 PM:
--

Sorry for taking so long to respond, I was away and somehow missed that while 
catching-up.
[~mingmxu] could you share your pom please ? I'm wondering if you're dependent 
only on the Spark runner, or other Beam modules as well.


was (Author: amitsela):
[~mingmxu] could you share your pom please ? I'm wondering if you're dependent 
only on the Spark runner, or other Beam modules as well.

> fail to run on SparkRunner with VerifyError
> ---
>
> Key: BEAM-1214
> URL: https://issues.apache.org/jira/browse/BEAM-1214
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 0.3.0-incubating
>Reporter: Xu Mingmin
>Assignee: Amit Sela
>
> I see this exception when running with SparkRunner locally. I fix it in my 
> environment by change jackson-module-scala_2.10 to version 2.7.2. 
> 
>   com.fasterxml.jackson.module
>   jackson-module-scala_2.10
>   2.7.2
> 
> Here's the exception stack trace:
> Exception in thread "main" java.lang.VerifyError: class 
> com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides 
> final method 
> withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(Unknown Source)
>   at java.security.SecureClassLoader.defineClass(Unknown Source)
>   at java.net.URLClassLoader.defineClass(Unknown Source)
>   at java.net.URLClassLoader.access$100(Unknown Source)
>   at java.net.URLClassLoader$1.run(Unknown Source)
>   at java.net.URLClassLoader$1.run(Unknown Source)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at 
> com.fasterxml.jackson.module.scala.ser.IteratorSerializerModule$class.$init$(IteratorSerializerModule.scala:70)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule.(DefaultScalaModule.scala:19)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule$.(DefaultScalaModule.scala:35)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule$.(DefaultScalaModule.scala)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala:81)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
>   at 
> org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:161)
>   at 
> org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:46)
>   at 
> org.apache.beam.runners.spark.translation.TransformTranslator$7.evaluate(TransformTranslator.java:262)
>   at 
> org.apache.beam.runners.spark.translation.TransformTranslator$7.evaluate(TransformTranslator.java:248)
>   at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:267)
>   at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:252)
>   at 
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:224)
>   at 
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:101)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:167)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:75)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1214) fail to run on SparkRunner with VerifyError

2017-01-19 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15829944#comment-15829944
 ] 

Amit Sela commented on BEAM-1214:
-

[~mingmxu] could you share your pom please ? I'm wondering if you're dependent 
only on the Spark runner, or other Beam modules as well.

> fail to run on SparkRunner with VerifyError
> ---
>
> Key: BEAM-1214
> URL: https://issues.apache.org/jira/browse/BEAM-1214
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 0.3.0-incubating
>Reporter: Xu Mingmin
>Assignee: Amit Sela
>
> I see this exception when running with SparkRunner locally. I fix it in my 
> environment by change jackson-module-scala_2.10 to version 2.7.2. 
> 
>   com.fasterxml.jackson.module
>   jackson-module-scala_2.10
>   2.7.2
> 
> Here's the exception stack trace:
> Exception in thread "main" java.lang.VerifyError: class 
> com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides 
> final method 
> withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(Unknown Source)
>   at java.security.SecureClassLoader.defineClass(Unknown Source)
>   at java.net.URLClassLoader.defineClass(Unknown Source)
>   at java.net.URLClassLoader.access$100(Unknown Source)
>   at java.net.URLClassLoader$1.run(Unknown Source)
>   at java.net.URLClassLoader$1.run(Unknown Source)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at 
> com.fasterxml.jackson.module.scala.ser.IteratorSerializerModule$class.$init$(IteratorSerializerModule.scala:70)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule.(DefaultScalaModule.scala:19)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule$.(DefaultScalaModule.scala:35)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule$.(DefaultScalaModule.scala)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala:81)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>   at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
>   at 
> org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:161)
>   at 
> org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:46)
>   at 
> org.apache.beam.runners.spark.translation.TransformTranslator$7.evaluate(TransformTranslator.java:262)
>   at 
> org.apache.beam.runners.spark.translation.TransformTranslator$7.evaluate(TransformTranslator.java:248)
>   at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:267)
>   at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:252)
>   at 
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:224)
>   at 
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>   at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:101)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:167)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:75)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (BEAM-979) ConcurrentModificationException exception after hours of running

2017-01-19 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-979.

   Resolution: Fixed
Fix Version/s: 0.5.0

Resolved by BEAM-1177

> ConcurrentModificationException exception after hours of running
> 
>
> Key: BEAM-979
> URL: https://issues.apache.org/jira/browse/BEAM-979
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Stas Levin
>Assignee: Amit Sela
> Fix For: 0.5.0
>
>
> {code}
>   
> User class threw exception: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 1 in stage 4483.0 failed 4 times, most recent failure: 
> Lost task 1.3 in stage 4483.0 (TID 44548, .com): 
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at 
> com.xxx.relocated.com.google.common.collect.Iterators$3.next(Iterators.java:177)
> at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (BEAM-921) Register Coders and Sources to serialize with JavaSerializer

2017-01-19 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-921.

   Resolution: Fixed
Fix Version/s: 0.5.0

> Register Coders and Sources to serialize with JavaSerializer
> 
>
> Key: BEAM-921
> URL: https://issues.apache.org/jira/browse/BEAM-921
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
> Fix For: 0.5.0
>
>
> Register all implementations of {{Source}} and {{Coder}} in 
> {{BeamSparkRunnerRegistrator}} to use Spark's {{JavaSerializer}}.
> This will provide stability since Sources and Coders won't guarantee being 
> Kryo-serializable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-849) Redesign PipelineResult API

2017-01-12 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15820729#comment-15820729
 ] 

Amit Sela commented on BEAM-849:


So in the "spirit" of the unified model, would you agree that bounded pipelines 
should also explicitly call for {{cancel()}} after 
{{waitUntilFinish(Duration)}} ? and so they should return the {{State}} as it 
is and not {{null}} ?
Also, it might make sense to not support {{waitUntilFinish()}} in unbounded 
pipelines since it will wait "forever", no ?

> Redesign PipelineResult API
> ---
>
> Key: BEAM-849
> URL: https://issues.apache.org/jira/browse/BEAM-849
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Pei He
>
> Current state: 
> Jira https://issues.apache.org/jira/browse/BEAM-443 addresses 
> waitUntilFinish() and cancel(). 
> However, there are additional work around PipelineResult: 
> need clearly defined contract and verification across all runners 
> need to revisit how to handle metrics/aggregators 
> need to be able to get logs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1256) SparkPipelineResult.StreamingMode should not return null on awaitTermination()

2017-01-10 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1256:
---

 Summary: SparkPipelineResult.StreamingMode should not return null 
on awaitTermination()
 Key: BEAM-1256
 URL: https://issues.apache.org/jira/browse/BEAM-1256
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela
Priority: Minor


Currently, {{SparkPipelineResult.StreamingMode}} will return {{null}} if 
{{JavaStreamingContext#awaitTerminationOrTimeout(Long)}} returns {{false}}.
This is wrong because this will always return {{false}} and so {{null}} because 
Spark's {{ContextWaiter}} will only return {{true}} if the context was stopped 
and this only happens after calling "awaitTermination".

Following Spark code:
awaitTermination returns the [state of 
stopped|https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala#L74]
But can only stopped via 
[notifyStopped|https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala#L44]
Which is only called by 
[StreamingContext#stop()|https://github.com/apache/spark/blob/branch-1.6/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala#L714]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (BEAM-1177) Input DStream "bundles" should be in serialized form and include relevant metadata.

2017-01-10 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed BEAM-1177.
---

> Input DStream "bundles" should be in serialized form and include relevant 
> metadata.
> ---
>
> Key: BEAM-1177
> URL: https://issues.apache.org/jira/browse/BEAM-1177
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: 0.5.0
>
>
> Currently, the input partitions hold "bundles" of read elements within the 
> {{mapWithStateDStream}} used for the read.
> Since this is automatically shuffled, user-data (the read elements) should be 
> serialized using coders to avoid breaking (if user-data is not {{Kryo}} 
> serializable).
> Even after BEAM-848 would complete, the resulting {{MapWithStateDStream}} 
> would be checkpointed periodically and so it would still have to remain in 
> serialized form.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (BEAM-1250) Remove leaf when materializing PCollection to avoid re-evaluation.

2017-01-07 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed BEAM-1250.
---
   Resolution: Fixed
Fix Version/s: 0.5.0

> Remove leaf when materializing PCollection to avoid re-evaluation.
> --
>
> Key: BEAM-1250
> URL: https://issues.apache.org/jira/browse/BEAM-1250
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: 0.5.0
>
>
> When materializing a {{PCollection}} (implemented as {{RDD}}), to create a 
> {{PCollectionView}} for example, the runner should remove the materialized 
> {{RDD}} from the "leaves" set.
> The runner keeps track of leaves left un-handled in the DAG to force action 
> on them - {{Write}} for one is implemented via a sequence of ParDos which are 
> implemented by the runner via {{mapPartitions}} so we need to force an action.
> Materializing an {{RDD}} is done via the action {{collect()}} so no reason to 
> keep in "leaves" set.
> Currently, it remains in the "leaves" set and so it is forced and evaluates 
> the lineage and if not cached it will execute twice the lineage twice (unless 
> caches are applied for some reason).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1250) Remove leaf when materializing PCollection to avoid re-evaluation.

2017-01-06 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-1250:

Description: 
When materializing a {{PCollection}} (implemented as {{RDD}}), to create a 
{{PCollectionView}} for example, the runner should remove the materialized 
{{RDD}} from the "leaves" set.
The runner keeps track of leaves left un-handled in the DAG to force action on 
them - {{Write}} for one is implemented via a sequence of ParDos which are 
implemented by the runner via {{mapPartitions}} so we need to force an action.
Materializing an {{RDD}} is done via the action {{collect()}} so no reason to 
keep in "leaves" set.
Currently, it remains in the "leaves" set and so it is forced and evaluates the 
lineage and if not cached it will execute twice the lineage twice (unless 
caches are applied for some reason).

  was:
When materializing a {{PCollection}} (implemented as {{RDD}}), to create a 
{{PCollectionView}} for example, the runner should remove the materialized 
{{RDD}} from the "leaves" set.
The runner keeps track of leaves left un-handled in the DAG to force action on 
them - {{Write}} for one is implemented via a sequence of {{ParDo}}s which are 
implemented by the runner via {{mapPartitions}} so we need to force an action.
Materializing an {{RDD}} is done via the action {{collect()}} so no reason to 
keep in "leaves" set.
Currently, it remains in the "leaves" set and so it is forced and evaluates the 
lineage and if not cached it will execute twice the lineage twice (unless 
caches are applied for some reason).


> Remove leaf when materializing PCollection to avoid re-evaluation.
> --
>
> Key: BEAM-1250
> URL: https://issues.apache.org/jira/browse/BEAM-1250
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> When materializing a {{PCollection}} (implemented as {{RDD}}), to create a 
> {{PCollectionView}} for example, the runner should remove the materialized 
> {{RDD}} from the "leaves" set.
> The runner keeps track of leaves left un-handled in the DAG to force action 
> on them - {{Write}} for one is implemented via a sequence of ParDos which are 
> implemented by the runner via {{mapPartitions}} so we need to force an action.
> Materializing an {{RDD}} is done via the action {{collect()}} so no reason to 
> keep in "leaves" set.
> Currently, it remains in the "leaves" set and so it is forced and evaluates 
> the lineage and if not cached it will execute twice the lineage twice (unless 
> caches are applied for some reason).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1250) Remove leaf when materializing PCollection to avoid re-evaluation.

2017-01-06 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1250:
---

 Summary: Remove leaf when materializing PCollection to avoid 
re-evaluation.
 Key: BEAM-1250
 URL: https://issues.apache.org/jira/browse/BEAM-1250
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


When materializing a {{PCollection}} (implemented as {{RDD}}), to create a 
{{PCollectionView}} for example, the runner should remove the materialized 
{{RDD}} from the "leaves" set.
The runner keeps track of leaves left un-handled in the DAG to force action on 
them - {{Write}} for one is implemented via a sequence of {{ParDo}}s which are 
implemented by the runner via {{mapPartitions}} so we need to force an action.
Materializing an {{RDD}} is done via the action {{collect()}} so no reason to 
keep in "leaves" set.
Currently, it remains in the "leaves" set and so it is forced and evaluates the 
lineage and if not cached it will execute twice the lineage twice (unless 
caches are applied for some reason).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-649) Cache any PCollection implementation if accessed more than once

2017-01-04 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-649:
---
Description: 
Currently, the runner will cache any {{PCollection}} implementation - {{RDD}} 
or {{DStream}} - if accessed for the second time.
This can be further optimized to cache after the first evaluation, if accessed 
again, and also solve issues in BEAM-1206.

  was:
Spark will execute a pipeline ONLY if it's triggered by an action (batch) / 
output operation (streaming) - 
http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#output-operations-on-dstreams.

Currently, such actions in Beam are mostly implemented via ParDo, and 
translated by the runner as a Map transformation (via mapPartitions).

The runner overcomes this by "forcing" actions on untranslated leaves.
While this is OK, it would be better in some cases, e.g., Sinks, to apply the 
same ParDo translation but with foreach/foreachRDD instead of 
foreachPartition/mapPartitions.


> Cache any PCollection implementation if accessed more than once
> ---
>
> Key: BEAM-649
> URL: https://issues.apache.org/jira/browse/BEAM-649
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Jean-Baptiste Onofré
>
> Currently, the runner will cache any {{PCollection}} implementation - {{RDD}} 
> or {{DStream}} - if accessed for the second time.
> This can be further optimized to cache after the first evaluation, if 
> accessed again, and also solve issues in BEAM-1206.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-649) Cache any PCollection implementation if accessed more than once

2017-01-04 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-649:
---
Summary: Cache any PCollection implementation if accessed more than once  
(was: Pipeline "actions" should use foreachRDD via ParDo.)

> Cache any PCollection implementation if accessed more than once
> ---
>
> Key: BEAM-649
> URL: https://issues.apache.org/jira/browse/BEAM-649
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Jean-Baptiste Onofré
>
> Spark will execute a pipeline ONLY if it's triggered by an action (batch) / 
> output operation (streaming) - 
> http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#output-operations-on-dstreams.
> Currently, such actions in Beam are mostly implemented via ParDo, and 
> translated by the runner as a Map transformation (via mapPartitions).
> The runner overcomes this by "forcing" actions on untranslated leaves.
> While this is OK, it would be better in some cases, e.g., Sinks, to apply the 
> same ParDo translation but with foreach/foreachRDD instead of 
> foreachPartition/mapPartitions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-18) Add support for new Beam Sink API

2017-01-04 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-18?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15799073#comment-15799073
 ] 

Amit Sela commented on BEAM-18:
---

Since {{Sink}} is not a {{PTransform}}, but made of several {{Par.Do}}s, it is 
hard to figure out when to apply the action.
We could optimize in cases where a Beam {{Write}} transformation can be 
directly translated to a Spark one.

> Add support for new Beam Sink API
> -
>
> Key: BEAM-18
> URL: https://issues.apache.org/jira/browse/BEAM-18
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>
> Support Beam Sinks via Spark {{foreach}} API, Spark's native sinks (if / 
> where) possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (BEAM-1145) Remove classifier from shaded spark runner artifact

2017-01-02 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1145.
-
   Resolution: Fixed
Fix Version/s: 0.5.0

> Remove classifier from shaded spark runner artifact
> ---
>
> Key: BEAM-1145
> URL: https://issues.apache.org/jira/browse/BEAM-1145
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
> Fix For: 0.5.0
>
>
> Shade plugin configured in spark runner's pom adds a classifier to spark 
> runner shaded jar
> {code:xml}
> true
> spark-app
> {code}
> This means, that in order for a user application that is dependent on 
> spark-runner to work in cluster mode, they have to add the classifier in 
> their dependency declaration, like so:
> {code:xml}
> 
> org.apache.beam
> beam-runners-spark
> 0.4.0-incubating-SNAPSHOT
> spark-app
> 
> {code}
> Otherwise, if they do not specify classifier, the jar they get is unshaded, 
> which in cluster mode, causes collisions between different guava versions.
> Example exception in cluster mode when adding the dependency without 
> classifier:
> {code}
> 16/12/12 06:58:56 WARN TaskSetManager: Lost task 4.0 in stage 8.0 (TID 153, 
> lvsriskng02.lvs.paypal.com): java.lang.NoSuchMethodError: 
> com.google.common.base.Stopwatch.createStarted()Lcom/google/common/base/Stopwatch;
>   at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:137)
>   at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:98)
>   at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>   at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I would suggest that the classifier be removed from the shaded jar, to avoid 
> confusion among users, and have a better user experience.
> P.S. Looks like Dataflow runner does not add a classifier to its shaded jar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (BEAM-1146) Decrease spark runner startup overhead

2017-01-02 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1146.
-
   Resolution: Fixed
Fix Version/s: 0.5.0

> Decrease spark runner startup overhead
> --
>
> Key: BEAM-1146
> URL: https://issues.apache.org/jira/browse/BEAM-1146
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
> Fix For: 0.5.0
>
>
> BEAM-921 introduced a lazy singleton instantiated once in each machine 
> (driver & executors) which utilizes reflection to find all subclasses of 
> Source and Coder
> While this is beneficial in it's own right, the change added about one minute 
> of overhead in spark runner startup time (which cause the first job/stage to 
> take up to a minute).
> The change is in class {{BeamSparkRunnerRegistrator}}
> The reason reflection (specifically reflections library) was used here is 
> because  there is no current way of knowing all the source and coder classes 
> at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


<    1   2