[jira] [Assigned] (BEAM-1145) Remove classifier from shaded spark runner artifact

2016-12-21 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-1145:
---

Assignee: Aviem Zur  (was: Amit Sela)

> Remove classifier from shaded spark runner artifact
> ---
>
> Key: BEAM-1145
> URL: https://issues.apache.org/jira/browse/BEAM-1145
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>
> Shade plugin configured in spark runner's pom adds a classifier to spark 
> runner shaded jar
> {code:xml}
> true
> spark-app
> {code}
> This means, that in order for a user application that is dependent on 
> spark-runner to work in cluster mode, they have to add the classifier in 
> their dependency declaration, like so:
> {code:xml}
> 
> org.apache.beam
> beam-runners-spark
> 0.4.0-incubating-SNAPSHOT
> spark-app
> 
> {code}
> Otherwise, if they do not specify classifier, the jar they get is unshaded, 
> which in cluster mode, causes collisions between different guava versions.
> Example exception in cluster mode when adding the dependency without 
> classifier:
> {code}
> 16/12/12 06:58:56 WARN TaskSetManager: Lost task 4.0 in stage 8.0 (TID 153, 
> lvsriskng02.lvs.paypal.com): java.lang.NoSuchMethodError: 
> com.google.common.base.Stopwatch.createStarted()Lcom/google/common/base/Stopwatch;
>   at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:137)
>   at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:98)
>   at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>   at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I would suggest that the classifier be removed from the shaded jar, to avoid 
> confusion among users, and have a better user experience.
> P.S. Looks like Dataflow runner does not add a classifier to its shaded jar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (BEAM-1144) Spark runner fails to deserialize MicrobatchSource in cluster mode

2016-12-21 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-1144:
---

Assignee: Aviem Zur  (was: Amit Sela)

> Spark runner fails to deserialize MicrobatchSource in cluster mode
> --
>
> Key: BEAM-1144
> URL: https://issues.apache.org/jira/browse/BEAM-1144
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>
> When running in cluster mode (yarn), spark runner fails on deserialization of 
> {{MicrobatchSource}}
> After changes made in BEAM-921 spark runner fails in cluster mode with the 
> following:
> {code}
> 16/12/12 04:27:01 ERROR ApplicationMaster: User class threw exception: 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:115)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>   at 
> com.paypal.risk.platform.aleph.example.MapOnlyExample.main(MapOnlyExample.java:38)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:559)
> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
> deserialization.
>   at 
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
>   at 
> org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.beam.runners.spark.io.MicrobatchSource
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>  

[jira] [Assigned] (BEAM-1146) Decrease spark runner startup overhead

2016-12-21 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-1146:
---

Assignee: Aviem Zur  (was: Amit Sela)

> Decrease spark runner startup overhead
> --
>
> Key: BEAM-1146
> URL: https://issues.apache.org/jira/browse/BEAM-1146
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>
> BEAM-921 introduced a lazy singleton instantiated once in each machine 
> (driver & executors) which utilizes reflection to find all subclasses of 
> Source and Coder
> While this is beneficial in it's own right, the change added about one minute 
> of overhead in spark runner startup time (which cause the first job/stage to 
> take up to a minute).
> The change is in class {{BeamSparkRunnerRegistrator}}
> The reason reflection (specifically reflections library) was used here is 
> because  there is no current way of knowing all the source and coder classes 
> at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1126) Expose UnboundedSource split backlog in number of events

2016-12-14 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15747788#comment-15747788
 ] 

Aviem Zur commented on BEAM-1126:
-

Yeah, that logic is sound.
I'd be glad to help with that effort. Are there any open issues to tackle?

> Expose UnboundedSource split backlog in number of events
> 
>
> Key: BEAM-1126
> URL: https://issues.apache.org/jira/browse/BEAM-1126
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Daniel Halperin
>Priority: Minor
>
> Today {{UnboundedSource}} exposes split backlog in bytes via 
> {{getSplitBacklogBytes()}}
> There is value in exposing backlog in number of events as well, since this 
> number can be more human comprehensible than bytes. something like 
> {{getSplitBacklogEvents()}} or {{getSplitBacklogCount()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1155) Spark runner aggregators only support a handfuls of combiners

2016-12-14 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-1155:
---

 Summary: Spark runner aggregators only support a handfuls of 
combiners
 Key: BEAM-1155
 URL: https://issues.apache.org/jira/browse/BEAM-1155
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Aviem Zur
Assignee: Amit Sela


Spark runner aggregators only support a handfuls of combiners.

If your {{CombineFn}} implementation (specifically, a custom {{CombineFn}} 
written by the user for their aggregator) is not one that appears in 
{{org.apache.beam.runners.spark.translation.SparkRuntimeContext#getCoder}} you 
will get an {{IllegalArgumentException}} in your pipeline.

{code:java}
private Coder getCoder(Combine.CombineFn combiner) {
try {
  if (combiner.getClass() == Sum.SumIntegerFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
  } else if (combiner.getClass() == Sum.SumLongFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
  } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
  } else if (combiner.getClass() == Min.MinIntegerFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
  } else if (combiner.getClass() == Min.MinLongFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
  } else if (combiner.getClass() == Min.MinDoubleFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
  } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
  } else if (combiner.getClass() == Max.MaxLongFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
  } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
  } else {
throw new IllegalArgumentException("unsupported combiner in Aggregator: 
"
+ combiner.getClass().getName());
  }
} catch (CannotProvideCoderException e) {
  throw new IllegalStateException("Could not determine default coder for 
combiner", e);
}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-1146) Decrease spark runner startup overhead

2016-12-13 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745427#comment-15745427
 ] 

Aviem Zur edited comment on BEAM-1146 at 12/13/16 3:44 PM:
---

Possible solutions:
# Limiting reflections search criteria to a few specific packages cuts the time 
down to a only few seconds. However, custom user {{Coder}} and {{Source}} 
implementations may not fall within these packages, and could encounter Kryo 
serialization errors.
# Adding an annotation to all Coders and sources, so we can register them 
specifically, similar solutions to similar problems exist in beam code today 
and utilize ServiceLoader and annotations (For example to find implementations 
of {{IOChannelFactoryRegistrar}}). However, users will have to know to add 
these annotation to their custom {{Coder}} and {{Source}} implementations as 
well.
# Some combination of the previous 2 solutions.


was (Author: aviemzur):
Possible solutions:
# Limiting reflections search criteria to a few specific packages cuts the time 
down to 1 second. However, custom user {{Coder}} and {{Source}} implementations 
may not fall within these packages, and could encounter Kryo serialization 
errors.
# Adding an annotation to all Coders and sources, so we can register them 
specifically, similar solutions to similar problems exist in beam code today 
and utilize ServiceLoader and annotations (For example to find implementations 
of {{IOChannelFactoryRegistrar}}). However, users will have to know to add 
these annotation to their custom {{Coder}} and {{Source}} implementations as 
well.
# Some combination of the previous 2 solutions.

> Decrease spark runner startup overhead
> --
>
> Key: BEAM-1146
> URL: https://issues.apache.org/jira/browse/BEAM-1146
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Amit Sela
>
> BEAM-921 introduced a lazy singleton instantiated once in each machine 
> (driver & executors) which utilizes reflection to find all subclasses of 
> Source and Coder
> While this is beneficial in it's own right, the change added about one minute 
> of overhead in spark runner startup time (which cause the first job/stage to 
> take up to a minute).
> The change is in class {{BeamSparkRunnerRegistrator}}
> The reason reflection (specifically reflections library) was used here is 
> because  there is no current way of knowing all the source and coder classes 
> at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-1146) Decrease spark runner startup overhead

2016-12-13 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745427#comment-15745427
 ] 

Aviem Zur edited comment on BEAM-1146 at 12/13/16 3:36 PM:
---

Possible solutions:
# Limiting reflections search criteria to a few specific packages cuts the time 
down to 1 second. However, custom user {{Coder}} and {{Source}} implementations 
may not fall within these packages, and could encounter Kryo serialization 
errors.
# Adding an annotation to all Coders and sources, so we can register them 
specifically, similar solutions to similar problems exist in beam code today 
and utilize ServiceLoader and annotations (For example to find implementations 
of {{IOChannelFactoryRegistrar}}). However, users will have to know to add 
these annotation to their custom {{Coder}} and {{Source}} implementations as 
well.
# Some combination of the previous 2 solutions.


was (Author: aviemzur):
Possible solutions:
# Limiting reflections search criteria to a few specific packages cuts the time 
down to 1 second. However, custom user Coders and Sources may not fall within 
these packages, and could encounter Kryo serialization errors.
# Adding an annotation to all Coders and sources, so we can register them 
specifically, similar solutions to similar problems exist in beam code today 
and utilize ServiceLoader and annotations (For example to find implementations 
of {{IOChannelFactoryRegistrar}}).
# Some combination of the previous 2 solutions.

> Decrease spark runner startup overhead
> --
>
> Key: BEAM-1146
> URL: https://issues.apache.org/jira/browse/BEAM-1146
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Amit Sela
>
> BEAM-921 introduced a lazy singleton instantiated once in each machine 
> (driver & executors) which utilizes reflection to find all subclasses of 
> Source and Coder
> While this is beneficial in it's own right, the change added about one minute 
> of overhead in spark runner startup time (which cause the first job/stage to 
> take up to a minute).
> The change is in class {{BeamSparkRunnerRegistrator}}
> The reason reflection (specifically reflections library) was used here is 
> because  there is no current way of knowing all the source and coder classes 
> at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1146) Decrease spark runner startup overhead

2016-12-13 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745427#comment-15745427
 ] 

Aviem Zur commented on BEAM-1146:
-

Possible solutions:
# Limiting reflections search criteria to a few specific packages cuts the time 
down to 1 second. However, custom user Coders and Sources may not fall within 
these packages, and could encounter Kryo serialization errors.
# Adding an annotation to all Coders and sources, so we can register them 
specifically, similar solutions to similar problems exist in beam code today 
and utilize ServiceLoader and annotations (For example to find implementations 
of {{IOChannelFactoryRegistrar}}).
# Some combination of the previous 2 solutions.

> Decrease spark runner startup overhead
> --
>
> Key: BEAM-1146
> URL: https://issues.apache.org/jira/browse/BEAM-1146
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Amit Sela
>
> BEAM-921 introduced a lazy singleton instantiated once in each machine 
> (driver & executors) which utilizes reflection to find all subclasses of 
> Source and Coder
> While this is beneficial in it's own right, the change added about one minute 
> of overhead in spark runner startup time (which cause the first job/stage to 
> take up to a minute).
> The change is in class {{BeamSparkRunnerRegistrator}}
> The reason reflection (specifically reflections library) was used here is 
> because  there is no current way of knowing all the source and coder classes 
> at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1146) Spark runner ~1m startup overhead

2016-12-13 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1146:

Component/s: runner-spark

> Spark runner ~1m startup overhead
> -
>
> Key: BEAM-1146
> URL: https://issues.apache.org/jira/browse/BEAM-1146
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>
> BEAM-921 introduced a lazy singleton instantiated once in each machine 
> (driver & executors) which utilizes reflection to find all subclasses of 
> Source and Coder
> While this is beneficial in it's own right, the change added about one minute 
> of overhead in spark runner startup time (which cause the first job/stage to 
> take up to a minute).
> The change is in class {{BeamSparkRunnerRegistrator}}
> The reason reflection (specifically reflections library) was used here is 
> because  there is no current way of knowing all the source and coder classes 
> at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1146) Decrease spark runner startup overhead

2016-12-13 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1146:

Summary: Decrease spark runner startup overhead  (was: Spark runner ~1m 
startup overhead)

> Decrease spark runner startup overhead
> --
>
> Key: BEAM-1146
> URL: https://issues.apache.org/jira/browse/BEAM-1146
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Amit Sela
>
> BEAM-921 introduced a lazy singleton instantiated once in each machine 
> (driver & executors) which utilizes reflection to find all subclasses of 
> Source and Coder
> While this is beneficial in it's own right, the change added about one minute 
> of overhead in spark runner startup time (which cause the first job/stage to 
> take up to a minute).
> The change is in class {{BeamSparkRunnerRegistrator}}
> The reason reflection (specifically reflections library) was used here is 
> because  there is no current way of knowing all the source and coder classes 
> at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1146) Spark runner ~1m startup overhead

2016-12-13 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-1146:
---

 Summary: Spark runner ~1m startup overhead
 Key: BEAM-1146
 URL: https://issues.apache.org/jira/browse/BEAM-1146
 Project: Beam
  Issue Type: Improvement
Reporter: Aviem Zur


BEAM-921 introduced a lazy singleton instantiated once in each machine (driver 
& executors) which utilizes reflection to find all subclasses of Source and 
Coder
While this is beneficial in it's own right, the change added about one minute 
of overhead in spark runner startup time (which cause the first job/stage to 
take up to a minute).
The change is in class {{BeamSparkRunnerRegistrator}}
The reason reflection (specifically reflections library) was used here is 
because  there is no current way of knowing all the source and coder classes at 
runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1145) Remove classifier from shaded spark runner artifact

2016-12-13 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-1145:
---

 Summary: Remove classifier from shaded spark runner artifact
 Key: BEAM-1145
 URL: https://issues.apache.org/jira/browse/BEAM-1145
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Aviem Zur
Assignee: Amit Sela


Shade plugin configured in spark runner's pom adds a classifier to spark runner 
shaded jar
{code:xml}
true
spark-app
{code}
This means, that in order for a user application that is dependent on 
spark-runner to work in cluster mode, they have to add the classifier in their 
dependency declaration, like so:
{code:xml}

org.apache.beam
beam-runners-spark
0.4.0-incubating-SNAPSHOT
spark-app

{code}
Otherwise, if they do not specify classifier, the jar they get is unshaded, 
which in cluster mode, causes collisions between different guava versions.
Example exception in cluster mode when adding the dependency without classifier:
{code}
16/12/12 06:58:56 WARN TaskSetManager: Lost task 4.0 in stage 8.0 (TID 153, 
lvsriskng02.lvs.paypal.com): java.lang.NoSuchMethodError: 
com.google.common.base.Stopwatch.createStarted()Lcom/google/common/base/Stopwatch;
at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:137)
at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:98)
at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
I would suggest that the classifier be removed from the shaded jar, to avoid 
confusion among users, and have a better user experience.

P.S. Looks like Dataflow runner does not add a classifier to its shaded jar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1144) Spark runner fails to deserialize MicrobatchSource in cluster mode

2016-12-13 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1144:

Priority: Major  (was: Blocker)

> Spark runner fails to deserialize MicrobatchSource in cluster mode
> --
>
> Key: BEAM-1144
> URL: https://issues.apache.org/jira/browse/BEAM-1144
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Amit Sela
>
> When running in cluster mode (yarn), spark runner fails on deserialization of 
> {{MicrobatchSource}}
> After changes made in BEAM-921 spark runner fails in cluster mode with the 
> following:
> {code}
> 16/12/12 04:27:01 ERROR ApplicationMaster: User class threw exception: 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:115)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>   at 
> com.paypal.risk.platform.aleph.example.MapOnlyExample.main(MapOnlyExample.java:38)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:559)
> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
> deserialization.
>   at 
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
>   at 
> org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.beam.runners.spark.io.MicrobatchSource
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at 

[jira] [Commented] (BEAM-1144) Spark runner fails to deserialize MicrobatchSource in cluster mode

2016-12-13 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745377#comment-15745377
 ] 

Aviem Zur commented on BEAM-1144:
-

I've made a working solution here: 
https://github.com/apache/incubator-beam/compare/master...aviemzur:cnf-deserialize-issue

However, I cannot seem to reproduce the issue in spark (Without beam code). So 
we may want to see if there is something we are doing incorrectly and solve it 
another way.

> Spark runner fails to deserialize MicrobatchSource in cluster mode
> --
>
> Key: BEAM-1144
> URL: https://issues.apache.org/jira/browse/BEAM-1144
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Amit Sela
>Priority: Blocker
>
> When running in cluster mode (yarn), spark runner fails on deserialization of 
> {{MicrobatchSource}}
> After changes made in BEAM-921 spark runner fails in cluster mode with the 
> following:
> {code}
> 16/12/12 04:27:01 ERROR ApplicationMaster: User class threw exception: 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:115)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>   at 
> com.paypal.risk.platform.aleph.example.MapOnlyExample.main(MapOnlyExample.java:38)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:559)
> Caused by: com.esotericsoftware.kryo.KryoException: Error during Java 
> deserialization.
>   at 
> com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
>   at 
> org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at 

[jira] [Commented] (BEAM-1126) Expose UnboundedSource split backlog in number of events

2016-12-12 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15744487#comment-15744487
 ] 

Aviem Zur commented on BEAM-1126:
-

The intent is indeed to report number of events via a metric/aggregator, but 
the context for this number is inside the {{UnboundedSource}} implementation, 
which is why exposing this number via a method is required.

> Expose UnboundedSource split backlog in number of events
> 
>
> Key: BEAM-1126
> URL: https://issues.apache.org/jira/browse/BEAM-1126
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Daniel Halperin
>Priority: Minor
>
> Today {{UnboundedSource}} exposes split backlog in bytes via 
> {{getSplitBacklogBytes()}}
> There is value in exposing backlog in number of events as well, since this 
> number can be more human comprehensible than bytes. something like 
> {{getSplitBacklogEvents()}} or {{getSplitBacklogCount()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1126) Expose UnboundedSource split backlog in number of events

2016-12-11 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15739694#comment-15739694
 ] 

Aviem Zur commented on BEAM-1126:
-

Reasoning: 
The backlog accessors are a very good indicator for application monitoring. As 
such, we plan to expose backlog as aggregators in spark-runner. 
Number of events is more human comprehensible than bytes. Specifically, in 
Kafka, backlog (or lag) is reasoned about in {{number of messages}}. See: 
https://kafka.apache.org/documentation#others_monitoring
If I understand correctly, for {{PubSub}} it is more common to reason about 
backlog in bytes, however, the implementation for {{KafkaIO}} seems forced, 
applying a byte approximation on a value that is originally in {{number of 
messages}}:
{code:java}
synchronized long approxBacklogInBytes() {
  // Note that is an an estimate of uncompressed backlog.
  if (latestOffset < 0 || nextOffset < 0) {
return UnboundedReader.BACKLOG_UNKNOWN;
  }
  return Math.max(0, (long) ((latestOffset - nextOffset) * avgRecordSize));
}
{code}
In conclusion - it seems that the API was written with {{PubSub}} in mind, 
however, {{Kafka}}, the open source equivalent, relates to backlog in terms of 
{{number of messages}}. 

> Expose UnboundedSource split backlog in number of events
> 
>
> Key: BEAM-1126
> URL: https://issues.apache.org/jira/browse/BEAM-1126
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Daniel Halperin
>Priority: Minor
>
> Today {{UnboundedSource}} exposes split backlog in bytes via 
> {{getSplitBacklogBytes()}}
> There is value in exposing backlog in number of events as well, since this 
> number can be more human comprehensible than bytes. something like 
> {{getSplitBacklogEvents()}} or {{getSplitBacklogCount()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-599) Return KafkaIO getWatermark log in debug mode

2016-12-10 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15738144#comment-15738144
 ] 

Aviem Zur commented on BEAM-599:


The PR was merged I think this issue can now be marked resolved.

> Return KafkaIO getWatermark log in debug mode
> -
>
> Key: BEAM-599
> URL: https://issues.apache.org/jira/browse/BEAM-599
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Aviem Zur
>Priority: Minor
>
> https://issues.apache.org/jira/browse/BEAM-574 removes the getWatermark log 
> line from KafkaIO
> PR: https://github.com/apache/incubator-beam/pull/859
> I actually found this log line useful, instead of removing it completely can 
> we return this log line but change the log level to 'debug'?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1126) Expose UnboundedSource split backlog in number of events

2016-12-10 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1126:

Description: 
Today {{UnboundedSource}} exposes split backlog in bytes via 
{{getSplitBacklogBytes()}}

There is value in exposing backlog in number of events as well, since this 
number can be more human comprehensible than bytes. something like 
{{getSplitBacklogEvents()}} or {{getSplitBacklogCount()}}.

  was:
Today +UnboundedSource+ exposes split backlog in bytes via 
getSplitBacklogBytes()

There is value in exposing backlog in number of events as well, since this 
number can be more human comprehensible than bytes. something like 
getSplitBacklogEvents() or getSplitBacklogCount().


> Expose UnboundedSource split backlog in number of events
> 
>
> Key: BEAM-1126
> URL: https://issues.apache.org/jira/browse/BEAM-1126
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Daniel Halperin
>Priority: Minor
>
> Today {{UnboundedSource}} exposes split backlog in bytes via 
> {{getSplitBacklogBytes()}}
> There is value in exposing backlog in number of events as well, since this 
> number can be more human comprehensible than bytes. something like 
> {{getSplitBacklogEvents()}} or {{getSplitBacklogCount()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1126) Expose UnboundedSource split backlog in number of events

2016-12-10 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1126:

Description: 
Today +UnboundedSource+ exposes split backlog in bytes via 
getSplitBacklogBytes()

There is value in exposing backlog in number of events as well, since this 
number can be more human comprehensible than bytes. something like 
getSplitBacklogEvents() or getSplitBacklogCount().

  was:
Today UnboundedSource exposes split backlog in bytes via getSplitBacklogBytes()

There is value in exposing backlog in number of events as well, since this 
number can be more human comprehensible than bytes. something like 
getSplitBacklogEvents() or getSplitBacklogCount().


> Expose UnboundedSource split backlog in number of events
> 
>
> Key: BEAM-1126
> URL: https://issues.apache.org/jira/browse/BEAM-1126
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Daniel Halperin
>Priority: Minor
>
> Today +UnboundedSource+ exposes split backlog in bytes via 
> getSplitBacklogBytes()
> There is value in exposing backlog in number of events as well, since this 
> number can be more human comprehensible than bytes. something like 
> getSplitBacklogEvents() or getSplitBacklogCount().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1126) Expose UnboundedSource split backlog in number of events

2016-12-10 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-1126:
---

 Summary: Expose UnboundedSource split backlog in number of events
 Key: BEAM-1126
 URL: https://issues.apache.org/jira/browse/BEAM-1126
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Reporter: Aviem Zur
Assignee: Davor Bonaci
Priority: Minor


Today UnboundedSource exposes split backlog in bytes via getSplitBacklogBytes()

There is value in exposing backlog in number of events as well, since this 
number can be more human comprehensible than bytes. something like 
getSplitBacklogEvents() or getSplitBacklogCount().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1052) UnboundedSource splitId uniqueness breaks if more than one source is used.

2016-11-28 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1052:

Description: 
We use a "running-id" to identify source splits, but we reiterate for each 
source evaluated.
Spark already assigns a unique id per InputDStream, it would be unique enough 
if we replace {{MicrobatchSource}} hash code with one containing both the 
running-id and the InputDStream id.

  was:
We use a "running-id" to identify source splits, but we reiterate for each 
source evaluated.
Spark already assigns a unique id per InputDStream, it would be unique enough 
if we replace MicrobatchSource hash code with one containing both the 
running-id and the InputDStream id.


> UnboundedSource splitId uniqueness breaks if more than one source is used.
> --
>
> Key: BEAM-1052
> URL: https://issues.apache.org/jira/browse/BEAM-1052
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
>
> We use a "running-id" to identify source splits, but we reiterate for each 
> source evaluated.
> Spark already assigns a unique id per InputDStream, it would be unique enough 
> if we replace {{MicrobatchSource}} hash code with one containing both the 
> running-id and the InputDStream id.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1052) UnboundedSource splitId uniqueness breaks if more than one source is used.

2016-11-28 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1052:

Description: 
We use a "running-id" to identify source splits, but we reiterate for each 
source evaluated.
Spark already assigns a unique id per InputDStream, it would be unique enough 
if we replace MicrobatchSource hash code with one containing both the 
running-id and the InputDStream id.

  was:
We use a "running-id" to identify source splits, but we reiterate for each 
source evaluated.
Spark already assigns a unique id per InputDStream, it would be unique enough 
if we replace ```MicrobatchSource``` hash code with one containing both the 
running-id and the InputDStream id.


> UnboundedSource splitId uniqueness breaks if more than one source is used.
> --
>
> Key: BEAM-1052
> URL: https://issues.apache.org/jira/browse/BEAM-1052
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
>
> We use a "running-id" to identify source splits, but we reiterate for each 
> source evaluated.
> Spark already assigns a unique id per InputDStream, it would be unique enough 
> if we replace MicrobatchSource hash code with one containing both the 
> running-id and the InputDStream id.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1052) UnboundedSource splitId uniqueness breaks if more than one source is used.

2016-11-28 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1052:

Description: 
We use a "running-id" to identify source splits, but we reiterate for each 
source evaluated.
Spark already assigns a unique id per InputDStream, it would be unique enough 
if we replace ```MicrobatchSource``` hash code with one containing both the 
running-id and the InputDStream id.

  was:
We use a "running-id" to identify source splits, but we reiterate for each 
source evaluated.
Spark already assigns a unique id per InputDStream, it would be unique enough 
if we replace the int ids with a String such as: InputDStream.id() + "_" + 
running-id.


> UnboundedSource splitId uniqueness breaks if more than one source is used.
> --
>
> Key: BEAM-1052
> URL: https://issues.apache.org/jira/browse/BEAM-1052
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
>
> We use a "running-id" to identify source splits, but we reiterate for each 
> source evaluated.
> Spark already assigns a unique id per InputDStream, it would be unique enough 
> if we replace ```MicrobatchSource``` hash code with one containing both the 
> running-id and the InputDStream id.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-626) AvroCoder not deserializing correctly in Kryo

2016-09-12 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15484824#comment-15484824
 ] 

Aviem Zur commented on BEAM-626:


I believed the place to make this fix is AvroCoder since it is the only class 
in beam to use writeReplace and readResolve. It is the only coder we have 
problems with in Kryo (using Spark runner), while other coders, such as 
ProtoCoder serialize correctly and lazily initializes its transient members. 
Which is why I thought the solution should be in AvroCoder rather than 
manipulating Kryo, as this problem can arise in other runners.

> AvroCoder not deserializing correctly in Kryo
> -
>
> Key: BEAM-626
> URL: https://issues.apache.org/jira/browse/BEAM-626
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Davor Bonaci
>Priority: Minor
>
> Unlike with Java serialization, when deserializing AvroCoder using Kryo, the 
> resulting AvroCoder is missing all of its transient fields.
> The reason it works with Java serialization is because of the usage of 
> writeReplace and readResolve, which Kryo does not adhere to.
> In ProtoCoder for example there are also unserializable members, the way it 
> is solved there is lazy initializing these members via their getters, so they 
> are initialized in the deserialized object on first call to the member.
> It seems AvroCoder is the only class in Beam to use writeReplace convention.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-626) AvroCoder not deserializing correctly in Kryo

2016-09-12 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15484717#comment-15484717
 ] 

Aviem Zur commented on BEAM-626:


It does have such a fallback, but seems to only be for Externalizable classes: 
https://github.com/EsotericSoftware/kryo/blob/cef15a3dc55e74162399fce163e19d4845a9f890/src/com/esotericsoftware/kryo/serializers/ExternalizableSerializer.java


> AvroCoder not deserializing correctly in Kryo
> -
>
> Key: BEAM-626
> URL: https://issues.apache.org/jira/browse/BEAM-626
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Davor Bonaci
>Priority: Minor
>
> Unlike with Java serialization, when deserializing AvroCoder using Kryo, the 
> resulting AvroCoder is missing all of its transient fields.
> The reason it works with Java serialization is because of the usage of 
> writeReplace and readResolve, which Kryo does not adhere to.
> In ProtoCoder for example there are also unserializable members, the way it 
> is solved there is lazy initializing these members via their getters, so they 
> are initialized in the deserialized object on first call to the member.
> It seems AvroCoder is the only class in Beam to use writeReplace convention.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-626) AvroCoder not deserializing correctly in Kryo

2016-09-11 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-626:
--

 Summary: AvroCoder not deserializing correctly in Kryo
 Key: BEAM-626
 URL: https://issues.apache.org/jira/browse/BEAM-626
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Aviem Zur
Assignee: Davor Bonaci
Priority: Minor


Unlike with Java serialization, when deserializing AvroCoder using Kryo, the 
resulting AvroCoder is missing all of its transient fields.

The reason it works with Java serialization is because of the usage of 
writeReplace and readResolve, which Kryo does not adhere to.

In ProtoCoder for example there are also unserializable members, the way it is 
solved there is lazy initializing these members via their getters, so they are 
initialized in the deserialized object on first call to the member.

It seems AvroCoder is the only class in Beam to use writeReplace convention.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-599) Return KafkaIO getWatermark log in debug mode

2016-08-28 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-599:
---
Description: 
https://issues.apache.org/jira/browse/BEAM-574 removes the getWatermark log 
line from KafkaIO
PR: https://github.com/apache/incubator-beam/pull/859

I actually found this log line useful, instead of removing it completely can we 
return this log line but change the log level to 'debug'?

  was:
https://issues.apache.org/jira/browse/BEAM-574
Removes the getWatermark log line from KafkaIO
PR: https://github.com/apache/incubator-beam/pull/859

I actually found this log line useful, instead of removing it completely can we 
return this log line but change the log level to 'debug'?


> Return KafkaIO getWatermark log in debug mode
> -
>
> Key: BEAM-599
> URL: https://issues.apache.org/jira/browse/BEAM-599
> Project: Beam
>  Issue Type: Improvement
>Reporter: Aviem Zur
>Priority: Minor
>
> https://issues.apache.org/jira/browse/BEAM-574 removes the getWatermark log 
> line from KafkaIO
> PR: https://github.com/apache/incubator-beam/pull/859
> I actually found this log line useful, instead of removing it completely can 
> we return this log line but change the log level to 'debug'?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-599) Return KafkaIO getWatermark log in debug mode

2016-08-28 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-599:
--

 Summary: Return KafkaIO getWatermark log in debug mode
 Key: BEAM-599
 URL: https://issues.apache.org/jira/browse/BEAM-599
 Project: Beam
  Issue Type: Improvement
Reporter: Aviem Zur
Priority: Minor


https://issues.apache.org/jira/browse/BEAM-574
Removes the getWatermark log line from KafkaIO
PR: https://github.com/apache/incubator-beam/pull/859

I actually found this log line useful, instead of removing it completely can we 
return this log line but change the log level to 'debug'?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-592) StackOverflowError Failed example/java/WordCount When Using SparkRunner

2016-08-27 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441229#comment-15441229
 ] 

Aviem Zur edited comment on BEAM-592 at 8/27/16 10:39 AM:
--

Yeah, I encountered this as well, and that is the way to solve it.

Cause:
jul-to-slf4j.jar and slf4j-jdk14.jar cannot be present simultaneously
http://slf4j.org/legacy.html#jul-to-slf4j


was (Author: aviemzur):
Yeah, I encountered this as well, andthat is the way to solve it.

Cause:
jul-to-slf4j.jar and slf4j-jdk14.jar cannot be present simultaneously
http://slf4j.org/legacy.html#jul-to-slf4j

> StackOverflowError Failed example/java/WordCount When Using SparkRunner
> ---
>
> Key: BEAM-592
> URL: https://issues.apache.org/jira/browse/BEAM-592
> Project: Beam
>  Issue Type: Bug
>Reporter: Mark Liu
>Assignee: Mark Liu
> Fix For: 0.3.0-incubating
>
>
> WordCount(example/java/WordCount) failed running with sparkRunner in 
> following command:
> {code}
> mvn clean compile exec:java 
> -Dexec.mainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=/tmp/kinglear.txt --runner=SparkRunner 
> --sparkMaster=local --tempLocation=/tmp/out"
> {code}
> Following is part of stacktrace:
> {code}
> Caused by: java.lang.StackOverflowError
>   at 
> java.util.concurrent.CopyOnWriteArrayList.toArray(CopyOnWriteArrayList.java:374)
>   at java.util.logging.Logger.accessCheckedHandlers(Logger.java:1782)
>   at 
> java.util.logging.LogManager$RootLogger.accessCheckedHandlers(LogManager.java:1668)
>   at java.util.logging.Logger.getHandlers(Logger.java:1776)
>   at java.util.logging.Logger.log(Logger.java:735)
>   at org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:580)
>   at org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:650)
>   at 
> org.slf4j.bridge.SLF4JBridgeHandler.callLocationAwareLogger(SLF4JBridgeHandler.java:224)
>   at 
> org.slf4j.bridge.SLF4JBridgeHandler.publish(SLF4JBridgeHandler.java:301)
>   at java.util.logging.Logger.log(Logger.java:738)
>   at org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:580)
>   at org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:650)
>   at 
> org.slf4j.bridge.SLF4JBridgeHandler.callLocationAwareLogger(SLF4JBridgeHandler.java:224)
>   at 
> org.slf4j.bridge.SLF4JBridgeHandler.publish(SLF4JBridgeHandler.java:301)
> ...
> {code}
> According to [http://slf4j.org/legacy.html#jul-to-slf4j] and in particular 
> section: "jul-to-slf4j.jar and slf4j-jdk14.jar cannot be present 
> simultaneously".
> Change slf4j-jdk14 dependency scope to test solve the above problem, but 
> WordCountIT still failed in same reason. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-592) StackOverflowError Failed example/java/WordCount When Using SparkRunner

2016-08-27 Thread Aviem Zur (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15441229#comment-15441229
 ] 

Aviem Zur commented on BEAM-592:


Yeah, I encountered this as well, andthat is the way to solve it.

Cause:
jul-to-slf4j.jar and slf4j-jdk14.jar cannot be present simultaneously
http://slf4j.org/legacy.html#jul-to-slf4j

> StackOverflowError Failed example/java/WordCount When Using SparkRunner
> ---
>
> Key: BEAM-592
> URL: https://issues.apache.org/jira/browse/BEAM-592
> Project: Beam
>  Issue Type: Bug
>Reporter: Mark Liu
>Assignee: Mark Liu
> Fix For: 0.3.0-incubating
>
>
> WordCount(example/java/WordCount) failed running with sparkRunner in 
> following command:
> {code}
> mvn clean compile exec:java 
> -Dexec.mainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=/tmp/kinglear.txt --runner=SparkRunner 
> --sparkMaster=local --tempLocation=/tmp/out"
> {code}
> Following is part of stacktrace:
> {code}
> Caused by: java.lang.StackOverflowError
>   at 
> java.util.concurrent.CopyOnWriteArrayList.toArray(CopyOnWriteArrayList.java:374)
>   at java.util.logging.Logger.accessCheckedHandlers(Logger.java:1782)
>   at 
> java.util.logging.LogManager$RootLogger.accessCheckedHandlers(LogManager.java:1668)
>   at java.util.logging.Logger.getHandlers(Logger.java:1776)
>   at java.util.logging.Logger.log(Logger.java:735)
>   at org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:580)
>   at org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:650)
>   at 
> org.slf4j.bridge.SLF4JBridgeHandler.callLocationAwareLogger(SLF4JBridgeHandler.java:224)
>   at 
> org.slf4j.bridge.SLF4JBridgeHandler.publish(SLF4JBridgeHandler.java:301)
>   at java.util.logging.Logger.log(Logger.java:738)
>   at org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:580)
>   at org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:650)
>   at 
> org.slf4j.bridge.SLF4JBridgeHandler.callLocationAwareLogger(SLF4JBridgeHandler.java:224)
>   at 
> org.slf4j.bridge.SLF4JBridgeHandler.publish(SLF4JBridgeHandler.java:301)
> ...
> {code}
> According to [http://slf4j.org/legacy.html#jul-to-slf4j] and in particular 
> section: "jul-to-slf4j.jar and slf4j-jdk14.jar cannot be present 
> simultaneously".
> Change slf4j-jdk14 dependency scope to test solve the above problem, but 
> WordCountIT still failed in same reason. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-584) Support Verifiers in TestSparkRunner

2016-08-25 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-584:
--

 Summary: Support Verifiers in TestSparkRunner
 Key: BEAM-584
 URL: https://issues.apache.org/jira/browse/BEAM-584
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Aviem Zur
Assignee: Amit Sela


[~jasonkuster] suggested that we should support verifiers to better support E2E 
tests.
See 
https://github.com/apache/incubator-beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
 for an example of how they're used and 
https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
 for how they are implemented in the TestDataflowRunner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)