[jira] [Commented] (BEAM-5519) Spark Streaming Duplicated Encoding/Decoding Effort

2018-10-01 Thread Amit Sela (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634849#comment-16634849
 ] 

Amit Sela commented on BEAM-5519:
-

[~winkelman.kyle] you bring up a good point.
IIRC we did all this mess to guarantee that in case a shuffle is required by 
Spark, we control it (initiate it), and it applies to RDDs containing 
serialized data only.
This might have been _before_ we got the "force default partitioner" in place, 
or a mixed process.. not sure.
You can do test your change (in streaming, which uses {{UpdateStateByKey}} and 
so creates shuffles etc.) to make sure that no shuffle occurs on RDDs 
containing deserialized data. In addition, you can use a non-Kryo serializable 
(if the runner still defaults to Kryo underneath..) and make sure it doesn't 
fail.
Hope that helps!

> Spark Streaming Duplicated Encoding/Decoding Effort
> ---
>
> Key: BEAM-5519
> URL: https://issues.apache.org/jira/browse/BEAM-5519
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
>Priority: Major
>  Labels: spark, spark-streaming
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> When using the SparkRunner in streaming mode. There is a call to groupByKey 
> followed by a call to updateStateByKey. BEAM-1815 fixed an issue where this 
> used to cause 2 shuffles but it still causes 2 encode/decode cycles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-20 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976280#comment-15976280
 ] 

Amit Sela commented on BEAM-1789:
-

I'd start with something less complicated, such as: 
{{Repeatedly.forever(AfterPane.elementCountAtLeast(1)).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()}}
This should fire for each update, with 1 day lateness and discarding panes.

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection>> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn>, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV> keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log of itPc 
> PCollection.
> I use local mode spark,It work well.
> Please help me to resovle this prop

[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-19 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974601#comment-15974601
 ] 

Amit Sela commented on BEAM-1789:
-

Thanks for clearing this for me, now I can pinpoint the issue.
Starting from {{0.6.0}} the Spark runner supports streaming (windows, triggers, 
etc.) but we're still experimenting with it.
Having said that, I don't think what you're experiencing is necessarily an 
issue, but rather a behaviour that might be related to the watermark and the 
{{DefaultTrigger}} you're using (by default).
In local mode, propagation of the watermark is probably faster and so the 
difference from cluster mode.
You could try using a simple "X seconds delay" watermark when reading from 
Kafka.
Let me know if it works for you.

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection>> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn>, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV> keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
>   

[jira] [Comment Edited] (BEAM-375) HadoopIO and runners-spark conflict with hadoop.version

2017-04-13 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15967634#comment-15967634
 ] 

Amit Sela edited comment on BEAM-375 at 4/13/17 2:16 PM:
-

What do you mean by "blocking" ? you mean the code execution is blocking ?


was (Author: amitsela):
What do you mean by "blocking" ?

> HadoopIO and runners-spark conflict with hadoop.version
> ---
>
> Key: BEAM-375
> URL: https://issues.apache.org/jira/browse/BEAM-375
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Pei He
>Assignee: Pei He
>
> HadoopIO currently uses 2.7.0 and runners-spark uses 2.2.0 for hadoop-client, 
> hadoop-common.
> From [~amitsela]
> "Spark can be built against different hadoop versions, but the release in 
> maven central is a 2.2.0 build (latest). ''
> For HadoopIO, I don't know why 2.7.0 is picked at the beginning. I can check 
> if it will work with 2.2.0.
> I am creating this issue, since I think it there is a general question.
> In principle, HadoopIO and other sdks Sources should work with any runners. 
> But, when one set of runners require version A, but the other set of runners 
> require version B, we will need a general solution for it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-375) HadoopIO and runners-spark conflict with hadoop.version

2017-04-13 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15967634#comment-15967634
 ] 

Amit Sela commented on BEAM-375:


What do you mean by "blocking" ?

> HadoopIO and runners-spark conflict with hadoop.version
> ---
>
> Key: BEAM-375
> URL: https://issues.apache.org/jira/browse/BEAM-375
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Pei He
>Assignee: Pei He
>
> HadoopIO currently uses 2.7.0 and runners-spark uses 2.2.0 for hadoop-client, 
> hadoop-common.
> From [~amitsela]
> "Spark can be built against different hadoop versions, but the release in 
> maven central is a 2.2.0 build (latest). ''
> For HadoopIO, I don't know why 2.7.0 is picked at the beginning. I can check 
> if it will work with 2.2.0.
> I am creating this issue, since I think it there is a general question.
> In principle, HadoopIO and other sdks Sources should work with any runners. 
> But, when one set of runners require version A, but the other set of runners 
> require version B, we will need a general solution for it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-12 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965633#comment-15965633
 ] 

Amit Sela commented on BEAM-1789:
-

Sorry, I meant {{stderr}}, {{stdout}} never shows anything in Spark for some 
reason, and Systemout's are presented in {{stderr}} as well.

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection>> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn>, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV> keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log of itPc 
> PCollection.
> I use local mode spark,It work well.
> Please help me to resovle this proplems.Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-04-12 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965498#comment-15965498
 ] 

Amit Sela commented on BEAM-1789:
-

I didn't get a chance to run this on cluster, but if I understand correctly the 
issue your describing is that you expect you {{System.out.println}} to show in 
the logs and it doesn't, correct ?
If so, in cluster (unlike local) you need to go through the Spark UI and look 
in the executors tab for {{stdout}}.

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection>> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn>, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV> keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log of itPc 
> PCollection.
> I use local mode 

[jira] [Commented] (BEAM-1920) Add Spark 2.x support in Spark runner

2017-04-09 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962076#comment-15962076
 ] 

Amit Sela commented on BEAM-1920:
-

Sweet! 
I'm considering Spark 2.x as default and {{1.6.3}} as the profile-enabled 
version because there are many issues with flaky tests now that I believe that 
should be resolved as a part of a bunch of bug fixes introduced in Spark 
{{2.0.0}} and {{2.0.1}}.

The only Con I can think of for that is that Spark 2.x dependency would provide 
an easy path to new, incompatible, APIs in Spark so we'd have to think 
carefully how we test for {{1.6.3}} breaking changes.  

> Add Spark 2.x support in Spark runner
> -
>
> Key: BEAM-1920
> URL: https://issues.apache.org/jira/browse/BEAM-1920
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Jean-Baptiste Onofré
>Assignee: Jean-Baptiste Onofré
>
> I have a branch working with both Spark 1 and Spark 2 backend.
> I'm preparing a pull request about that.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1737) Implement a Single-output ParDo as a Multi-output ParDo with a single output

2017-04-09 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1737.
-
   Resolution: Fixed
Fix Version/s: First stable release

> Implement a Single-output ParDo as a Multi-output ParDo with a single output
> 
>
> Key: BEAM-1737
> URL: https://issues.apache.org/jira/browse/BEAM-1737
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Thomas Groh
>Assignee: Amit Sela
>Priority: Minor
> Fix For: First stable release
>
>
> This is the cause of having a separate path and implementation for 
> single-output ParDos, even though both go through the same translator.
> Partial stacktrace:
> Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< 
> FAILURE! - in 
> org.apache.beam.runners.spark.translation.streaming.CreateStreamTest  
>[8233/41535]
> testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest)
>   Time elapsed: 3.593 sec  <<< ERROR!
> java.lang.RuntimeException: 
> java.io.NotSerializableException: DStream checkpointing has been enabled but 
> the DStreams with their functions are not serializable
> org.apache.beam.runners.spark.translation.EvaluationContext
> Serialization stack:
> - object not serializable (class: 
> org.apache.beam.runners.spark.translation.EvaluationContext, value: 
> org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7)
> - field (class: 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  name: val$context, type: class 
> org.apache.beam.runners.spark.translation.EvaluationContext)
> - object (class 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940)
> - field (class: 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  name: transformFunc$3, type: interface 
> org.apache.spark.api.java.function.Function)
> - object (class 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  name: cleanedF$2, type: interface scala.Function1)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> name: cleanedF$3, type: interface scala.Function2)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> )
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class 
> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files 
> ])
> 
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
> ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-981) Not possible to directly submit a pipeline on spark cluster

2017-04-05 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957480#comment-15957480
 ] 

Amit Sela commented on BEAM-981:


[~iemejia] wold you mind taking a look at this one ? we should be able to 
submit programatically to a Spark cluster.

> Not possible to directly submit a pipeline on spark cluster
> ---
>
> Key: BEAM-981
> URL: https://issues.apache.org/jira/browse/BEAM-981
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 0.3.0-incubating, 0.4.0
>Reporter: Jean-Baptiste Onofré
>
> It's not possible to directly run a pipeline on the spark runner (for 
> instance using {{mvn exec:java}}. It fails with:
> {code}
> [appclient-register-master-threadpool-0] INFO 
> org.apache.spark.deploy.client.AppClient$ClientEndpoint - Connecting to 
> master spark://10.200.118.197:7077...
> [shuffle-client-0] ERROR org.apache.spark.network.client.TransportClient - 
> Failed to send RPC 6813731522650020739 to /10.200.118.197:7077: 
> java.lang.AbstractMethodError: 
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> java.lang.AbstractMethodError: 
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
> at 
> io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
> at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:820)
> at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733)
> at 
> io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740)
> at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:826)
> at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733)
> at 
> io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740)
> at 
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
> at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1101)
> at 
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148)
> at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1090)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.safeExecute(SingleThreadEventExecutor.java:451)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
> at java.lang.Thread.run(Thread.java:745)
> [appclient-register-master-threadpool-0] WARN 
> org.apache.spark.deploy.client.AppClient$ClientEndpoint - Failed to connect 
> to master 10.200.118.197:7077
> java.io.IOException: Failed to send RPC 6813731522650020739 to 
> /10.200.118.197:7077: java.lang.AbstractMethodError: 
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
> at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:507)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:486)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427)
> at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:129)
> at 
> io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:845)
> at 
> io.netty.channel.Ab

[jira] [Commented] (BEAM-1737) Implement a Single-output ParDo as a Multi-output ParDo with a single output

2017-04-05 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956637#comment-15956637
 ] 

Amit Sela commented on BEAM-1737:
-

Changed the title since the change will actually do just that.

> Implement a Single-output ParDo as a Multi-output ParDo with a single output
> 
>
> Key: BEAM-1737
> URL: https://issues.apache.org/jira/browse/BEAM-1737
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Thomas Groh
>Assignee: Amit Sela
>Priority: Minor
>
> This is the cause of having a separate path and implementation for 
> single-output ParDos, even though both go through the same translator.
> Partial stacktrace:
> Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< 
> FAILURE! - in 
> org.apache.beam.runners.spark.translation.streaming.CreateStreamTest  
>[8233/41535]
> testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest)
>   Time elapsed: 3.593 sec  <<< ERROR!
> java.lang.RuntimeException: 
> java.io.NotSerializableException: DStream checkpointing has been enabled but 
> the DStreams with their functions are not serializable
> org.apache.beam.runners.spark.translation.EvaluationContext
> Serialization stack:
> - object not serializable (class: 
> org.apache.beam.runners.spark.translation.EvaluationContext, value: 
> org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7)
> - field (class: 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  name: val$context, type: class 
> org.apache.beam.runners.spark.translation.EvaluationContext)
> - object (class 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940)
> - field (class: 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  name: transformFunc$3, type: interface 
> org.apache.spark.api.java.function.Function)
> - object (class 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  name: cleanedF$2, type: interface scala.Function1)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> name: cleanedF$3, type: interface scala.Function2)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> )
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class 
> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files 
> ])
> 
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
> ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1737) Implement a Single-output ParDo as a Multi-output ParDo with a single output

2017-04-05 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-1737:

Summary: Implement a Single-output ParDo as a Multi-output ParDo with a 
single output  (was: Interpreting a Single-output ParDo as a Multi-output ParDo 
with a single output causes serialization failures)

> Implement a Single-output ParDo as a Multi-output ParDo with a single output
> 
>
> Key: BEAM-1737
> URL: https://issues.apache.org/jira/browse/BEAM-1737
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Thomas Groh
>Assignee: Amit Sela
>Priority: Minor
>
> This is the cause of having a separate path and implementation for 
> single-output ParDos, even though both go through the same translator.
> Partial stacktrace:
> Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< 
> FAILURE! - in 
> org.apache.beam.runners.spark.translation.streaming.CreateStreamTest  
>[8233/41535]
> testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest)
>   Time elapsed: 3.593 sec  <<< ERROR!
> java.lang.RuntimeException: 
> java.io.NotSerializableException: DStream checkpointing has been enabled but 
> the DStreams with their functions are not serializable
> org.apache.beam.runners.spark.translation.EvaluationContext
> Serialization stack:
> - object not serializable (class: 
> org.apache.beam.runners.spark.translation.EvaluationContext, value: 
> org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7)
> - field (class: 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  name: val$context, type: class 
> org.apache.beam.runners.spark.translation.EvaluationContext)
> - object (class 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940)
> - field (class: 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  name: transformFunc$3, type: interface 
> org.apache.spark.api.java.function.Function)
> - object (class 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  name: cleanedF$2, type: interface scala.Function1)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> name: cleanedF$3, type: interface scala.Function2)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> )
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class 
> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files 
> ])
> 
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
> ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1737) Interpreting a Single-output ParDo as a Multi-output ParDo with a single output causes serialization failures

2017-04-05 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela reassigned BEAM-1737:
---

Assignee: Amit Sela

> Interpreting a Single-output ParDo as a Multi-output ParDo with a single 
> output causes serialization failures
> -
>
> Key: BEAM-1737
> URL: https://issues.apache.org/jira/browse/BEAM-1737
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Thomas Groh
>Assignee: Amit Sela
>Priority: Minor
>
> This is the cause of having a separate path and implementation for 
> single-output ParDos, even though both go through the same translator.
> Partial stacktrace:
> Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< 
> FAILURE! - in 
> org.apache.beam.runners.spark.translation.streaming.CreateStreamTest  
>[8233/41535]
> testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest)
>   Time elapsed: 3.593 sec  <<< ERROR!
> java.lang.RuntimeException: 
> java.io.NotSerializableException: DStream checkpointing has been enabled but 
> the DStreams with their functions are not serializable
> org.apache.beam.runners.spark.translation.EvaluationContext
> Serialization stack:
> - object not serializable (class: 
> org.apache.beam.runners.spark.translation.EvaluationContext, value: 
> org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7)
> - field (class: 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  name: val$context, type: class 
> org.apache.beam.runners.spark.translation.EvaluationContext)
> - object (class 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940)
> - field (class: 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  name: transformFunc$3, type: interface 
> org.apache.spark.api.java.function.Function)
> - object (class 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  name: cleanedF$2, type: interface scala.Function1)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> name: cleanedF$3, type: interface scala.Function2)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> )
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class 
> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files 
> ])
> 
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
> ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1875) Remove Spark runner custom Hadoop and Avro IOs.

2017-04-04 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1875.
-
   Resolution: Fixed
Fix Version/s: First stable release

> Remove Spark runner custom Hadoop and Avro IOs.
> ---
>
> Key: BEAM-1875
> URL: https://issues.apache.org/jira/browse/BEAM-1875
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
> Fix For: First stable release
>
>
> Remove custom IOs, their evaluators, tests and usage.
> The runner relies on the SDK, and its tests, for all IOs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1737) Interpreting a Single-output ParDo as a Multi-output ParDo with a single output causes serialization failures

2017-04-04 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954833#comment-15954833
 ] 

Amit Sela commented on BEAM-1737:
-

This is just because you are not allowed to use not Serializables such as 
{{EvaluationContext}} inside {{DStream}} lambdas.
This line is simply not necessary and can be removed: 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java#L436

The step name is set before calling any {{DStream}} either for single/multi 
output.

I'll have a fix ready.

> Interpreting a Single-output ParDo as a Multi-output ParDo with a single 
> output causes serialization failures
> -
>
> Key: BEAM-1737
> URL: https://issues.apache.org/jira/browse/BEAM-1737
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Thomas Groh
>Priority: Minor
>
> This is the cause of having a separate path and implementation for 
> single-output ParDos, even though both go through the same translator.
> Partial stacktrace:
> Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< 
> FAILURE! - in 
> org.apache.beam.runners.spark.translation.streaming.CreateStreamTest  
>[8233/41535]
> testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest)
>   Time elapsed: 3.593 sec  <<< ERROR!
> java.lang.RuntimeException: 
> java.io.NotSerializableException: DStream checkpointing has been enabled but 
> the DStreams with their functions are not serializable
> org.apache.beam.runners.spark.translation.EvaluationContext
> Serialization stack:
> - object not serializable (class: 
> org.apache.beam.runners.spark.translation.EvaluationContext, value: 
> org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7)
> - field (class: 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  name: val$context, type: class 
> org.apache.beam.runners.spark.translation.EvaluationContext)
> - object (class 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1,
>  
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940)
> - field (class: 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  name: transformFunc$3, type: interface 
> org.apache.spark.api.java.function.Function)
> - object (class 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  name: cleanedF$2, type: interface scala.Function1)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  )
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> name: cleanedF$3, type: interface scala.Function2)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> )
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class 
> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files 
> ])
> 
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127)
> at 
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
> ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1875) Remove Spark runner custom Hadoop and Avro IOs.

2017-04-04 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1875:
---

 Summary: Remove Spark runner custom Hadoop and Avro IOs.
 Key: BEAM-1875
 URL: https://issues.apache.org/jira/browse/BEAM-1875
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela
Priority: Minor


Remove custom IOs, their evaluators, tests and usage.
The runner relies on the SDK, and its tests, for all IOs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-848) Shuffle input read-values to get maximum parallelism.

2017-04-03 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953760#comment-15953760
 ] 

Amit Sela commented on BEAM-848:


Agree. closed as invalid.
If use cases prove the need, we can consider a "repartition" to max parallelism 
(or a pre-set one) post read.

> Shuffle input read-values to get maximum parallelism.
> -
>
> Key: BEAM-848
> URL: https://issues.apache.org/jira/browse/BEAM-848
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
> Fix For: First stable release
>
>
> It would be wise to shuffle the read values _after_ flatmap to increase 
> parallelism in processing of the data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-848) Shuffle input read-values to get maximum parallelism.

2017-04-03 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed BEAM-848.
--
   Resolution: Invalid
Fix Version/s: First stable release

> Shuffle input read-values to get maximum parallelism.
> -
>
> Key: BEAM-848
> URL: https://issues.apache.org/jira/browse/BEAM-848
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
> Fix For: First stable release
>
>
> It would be wise to shuffle the read values _after_ flatmap to increase 
> parallelism in processing of the data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-696) Document: Side-Inputs non-deterministic with merging main-input windows

2017-03-30 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed BEAM-696.
--
   Resolution: Not A Problem
Fix Version/s: Not applicable

Resolved by providing better documentation. See comments for details. 

> Document: Side-Inputs non-deterministic with merging main-input windows
> ---
>
> Key: BEAM-696
> URL: https://issues.apache.org/jira/browse/BEAM-696
> Project: Beam
>  Issue Type: Task
>  Components: beam-model
>Reporter: Ben Chambers
>Assignee: Amit Sela
> Fix For: Not applicable
>
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute 
> within a Merging WindowFn.
> Possible solution would be to defer running anything that looks up the 
> side-input until we need to extract an output, and using the main-window at 
> that point. Specifically, if the main-window is a MergingWindowFn, don't 
> execute any kind of pre-combine, instead buffer all the inputs and combine 
> later.
> This could still run into some non-determinism if there are triggers 
> controlling when we extract output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1827) Fix use of deprecated Spark APIs in the runner.

2017-03-29 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1827.
-
   Resolution: Fixed
Fix Version/s: First stable release

> Fix use of deprecated Spark APIs in the runner. 
> 
>
> Key: BEAM-1827
> URL: https://issues.apache.org/jira/browse/BEAM-1827
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
> Fix For: First stable release
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1827) Fix use of deprecated Spark APIs in the runner.

2017-03-29 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1827:
---

 Summary: Fix use of deprecated Spark APIs in the runner. 
 Key: BEAM-1827
 URL: https://issues.apache.org/jira/browse/BEAM-1827
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1815) Avoid shuffling twice in GABW

2017-03-28 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1815.
-
   Resolution: Fixed
Fix Version/s: First stable release

> Avoid shuffling twice in GABW
> -
>
> Key: BEAM-1815
> URL: https://issues.apache.org/jira/browse/BEAM-1815
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> Spark runner implementation of GABW includes a "built-in" groupByKey, but 
> BOBK before it already groups, so in order to avoid an unnecessary shuffle we 
> need to force a {{Partitioner}} on the RDDs involved. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once

2017-03-27 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943698#comment-15943698
 ] 

Amit Sela edited comment on BEAM-1717 at 3/27/17 5:46 PM:
--

[~davor] {{beam-sdks-java-core}} is OK now, but now the release fails on a 
duplicate tests jar in the Dataflow runner module, I'll dig back in later this 
week but maybe you could also take a look there and see if something catches 
your attention.
We're working our way through this one nicely module-by-module, just a little 
bit more effort and we'll get there ;)


was (Author: amitsela):
[~davor] {{beam-sdks-java-core}} is OK now, but now the release fails on 
duplicate in the Dataflow runner module, I'll dig back in later this week but 
maybe you could also take a look there and see if something catches your 
attention.
We're working our way through this one nicely module-by-module, just a little 
bit more effort and we'll get there ;)

> Maven release/deploy tries to uploads some artifacts more than once 
> 
>
> Key: BEAM-1717
> URL: https://issues.apache.org/jira/browse/BEAM-1717
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
> Fix For: First stable release
>
>
> Running maven {{release}} or {{deploy}} causes some artifacts to deploy more 
> than once which fails deployments to release Nexus.
> While this is not an issue for the Apache release process (because it uses a 
> staging Nexus), this affects users who wish to deploy their own fork. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once

2017-03-27 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943698#comment-15943698
 ] 

Amit Sela commented on BEAM-1717:
-

[~davor] {{beam-sdks-java-core}} is OK now, but now the release fails on 
duplicate in the Dataflow runner module, I'll dig back in later this week but 
maybe you could also take a look there and see if something catches your 
attention.
We're working our way through this one nicely module-by-module, just a little 
bit more effort and we'll get there ;)

> Maven release/deploy tries to uploads some artifacts more than once 
> 
>
> Key: BEAM-1717
> URL: https://issues.apache.org/jira/browse/BEAM-1717
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
> Fix For: First stable release
>
>
> Running maven {{release}} or {{deploy}} causes some artifacts to deploy more 
> than once which fails deployments to release Nexus.
> While this is not an issue for the Apache release process (because it uses a 
> staging Nexus), this affects users who wish to deploy their own fork. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1815) Avoid shuffling twice in GABW

2017-03-27 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1815:
---

 Summary: Avoid shuffling twice in GABW
 Key: BEAM-1815
 URL: https://issues.apache.org/jira/browse/BEAM-1815
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


Spark runner implementation of GABW includes a "built-in" groupByKey, but BOBK 
before it already groups, so in order to avoid an unnecessary shuffle we need 
to force a {{Partitioner}} on the RDDs involved. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1802) Spark Runner does not shutdown correctly when executing multiple pipelines in sequence

2017-03-25 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1802.
-
   Resolution: Fixed
Fix Version/s: First stable release

> Spark Runner does not shutdown correctly when executing multiple pipelines in 
> sequence
> --
>
> Key: BEAM-1802
> URL: https://issues.apache.org/jira/browse/BEAM-1802
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Ismaël Mejía
>Assignee: Aviem Zur
> Fix For: First stable release
>
>
> I found this while running the Nexmark queries in sequence in local mode. I 
> had the correct configuration but it didn't seem to work.
> 17/03/24 12:07:49 WARN org.apache.spark.SparkContext: Multiple running 
> SparkContexts detected in the same JVM!
> org.apache.spark.SparkException: Only one SparkContext may be running in this 
> JVM (see SPARK-2243). To ignore this error, set 
> spark.driver.allowMultipleContexts = true. The currently running SparkContext 
> was created at:
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
> org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:100)
> org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:69)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:206)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:91)
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:266)
> org.apache.beam.integration.nexmark.NexmarkRunner.run(NexmarkRunner.java:1233)
> org.apache.beam.integration.nexmark.NexmarkDriver.runAll(NexmarkDriver.java:69)
> org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver.main(NexmarkSparkDriver.java:46)
>   at 
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257)
>   at 
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1802) Spark Runner does not shutdown correctly when executing multiple pipelines in sequence

2017-03-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940422#comment-15940422
 ] 

Amit Sela commented on BEAM-1802:
-

{{PipelineResult#cancel()}} should do the trick for now

> Spark Runner does not shutdown correctly when executing multiple pipelines in 
> sequence
> --
>
> Key: BEAM-1802
> URL: https://issues.apache.org/jira/browse/BEAM-1802
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Ismaël Mejía
>Assignee: Aviem Zur
>
> I found this while running the Nexmark queries in sequence in local mode. I 
> had the correct configuration but it didn't seem to work.
> 17/03/24 12:07:49 WARN org.apache.spark.SparkContext: Multiple running 
> SparkContexts detected in the same JVM!
> org.apache.spark.SparkException: Only one SparkContext may be running in this 
> JVM (see SPARK-2243). To ignore this error, set 
> spark.driver.allowMultipleContexts = true. The currently running SparkContext 
> was created at:
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
> org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:100)
> org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:69)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:206)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:91)
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:266)
> org.apache.beam.integration.nexmark.NexmarkRunner.run(NexmarkRunner.java:1233)
> org.apache.beam.integration.nexmark.NexmarkDriver.runAll(NexmarkDriver.java:69)
> org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver.main(NexmarkSparkDriver.java:46)
>   at 
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257)
>   at 
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1802) Spark Runner does not shutdown correctly when executing multiple pipelines in sequence

2017-03-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940351#comment-15940351
 ] 

Amit Sela commented on BEAM-1802:
-

The only valid scenario to expect the context to terminate is in batch jobs 
when blocking on {{waitUntilFinish()}} without timeout.

> Spark Runner does not shutdown correctly when executing multiple pipelines in 
> sequence
> --
>
> Key: BEAM-1802
> URL: https://issues.apache.org/jira/browse/BEAM-1802
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Ismaël Mejía
>Assignee: Aviem Zur
>
> I found this while running the Nexmark queries in sequence in local mode. I 
> had the correct configuration but it didn't seem to work.
> 17/03/24 12:07:49 WARN org.apache.spark.SparkContext: Multiple running 
> SparkContexts detected in the same JVM!
> org.apache.spark.SparkException: Only one SparkContext may be running in this 
> JVM (see SPARK-2243). To ignore this error, set 
> spark.driver.allowMultipleContexts = true. The currently running SparkContext 
> was created at:
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
> org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:100)
> org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:69)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:206)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:91)
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:266)
> org.apache.beam.integration.nexmark.NexmarkRunner.run(NexmarkRunner.java:1233)
> org.apache.beam.integration.nexmark.NexmarkDriver.runAll(NexmarkDriver.java:69)
> org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver.main(NexmarkSparkDriver.java:46)
>   at 
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257)
>   at 
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1802) Spark Runner does not shutdown correctly when executing multiple pipelines in sequence

2017-03-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940266#comment-15940266
 ] 

Amit Sela commented on BEAM-1802:
-

[~iemejia] do you stop the pipeline between executions ? 
{code}
Pipeline p = ...
p.apply().apply()..
p.waitUntilFinish()
// do you call stop() ?
{code}

> Spark Runner does not shutdown correctly when executing multiple pipelines in 
> sequence
> --
>
> Key: BEAM-1802
> URL: https://issues.apache.org/jira/browse/BEAM-1802
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Ismaël Mejía
>Assignee: Aviem Zur
>
> I found this while running the Nexmark queries in sequence in local mode. I 
> had the correct configuration but it didn't seem to work.
> 17/03/24 12:07:49 WARN org.apache.spark.SparkContext: Multiple running 
> SparkContexts detected in the same JVM!
> org.apache.spark.SparkException: Only one SparkContext may be running in this 
> JVM (see SPARK-2243). To ignore this error, set 
> spark.driver.allowMultipleContexts = true. The currently running SparkContext 
> was created at:
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
> org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:100)
> org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:69)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:206)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:91)
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:266)
> org.apache.beam.integration.nexmark.NexmarkRunner.run(NexmarkRunner.java:1233)
> org.apache.beam.integration.nexmark.NexmarkDriver.runAll(NexmarkDriver.java:69)
> org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver.main(NexmarkSparkDriver.java:46)
>   at 
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257)
>   at 
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once

2017-03-23 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938372#comment-15938372
 ] 

Amit Sela commented on BEAM-1717:
-

Looks like https://github.com/apache/beam/pull/2261 and the patch I described 
above fix the issues in {{beam-sdks-java-core}}.
Now there's a similar issue in dataflow-runner, I'll keep digging.

> Maven release/deploy tries to uploads some artifacts more than once 
> 
>
> Key: BEAM-1717
> URL: https://issues.apache.org/jira/browse/BEAM-1717
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
>
> Running maven {{release}} or {{deploy}} causes some artifacts to deploy more 
> than once which fails deployments to release Nexus.
> While this is not an issue for the Apache release process (because it uses a 
> staging Nexus), this affects users who wish to deploy their own fork. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-03-23 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938115#comment-15938115
 ] 

Amit Sela commented on BEAM-1789:
-

Please keep in mind that Spark runner support for streaming is still 
experimental.
Once we finish-up the work on streaming and have an IT running 
WindowedWordCount in streaming we'll have better visibility.

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection>> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn>, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV> keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log of itPc 
> PCollection.
> I use local mode spark,It work well.
> Please help me to resovle this proplems.Thanks!



--
This message was sent by At

[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-03-23 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938032#comment-15938032
 ] 

Amit Sela commented on BEAM-1789:
-

In local mode all the logging is in one single log, in cluster mode each 
executor logs it's logging locally - that's what I'm trying to figure out, are 
you looking at all executors log for the missing log-line ?
Also, I think you might want to change the name of this issue since it doesn't 
seem to be related to windows (or am I missing something) ? 

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection>> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn>, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV> keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  
> totalPc PCollection  of. after one miniter but I can.t see log 

[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

2017-03-23 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15937871#comment-15937871
 ] 

Amit Sela commented on BEAM-1789:
-

[~tianyou] where are you looking at the logs ? in Spark UI ? which executors ? 
not all executors will run tasks for all steps of the pipeline so an executor 
that runs tasks that process totalPc but no task for itPc won't show the log.

A couple of questions/notes: 
Do you see any exceptions/errors ? is the application behaving as expected 
(besides the logging) ?
Which Beam version are you running ?
Is there a particular reason you write to file from within the DoFn and not 
using Beam's TextIO/HdfsIO ?
  

> window can't not use in spark cluster module
> 
>
> Key: BEAM-1789
> URL: https://issues.apache.org/jira/browse/BEAM-1789
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: tianyou
>Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setRunner(SparkRunner.class);
> options.setEnableSparkMetricSinks(false);
> options.setStreaming(true);
> options.setSparkMaster("spark://10.100.124.205:6066");
> options.setAppName("Beam App Spark"+new Random().nextFloat());
> options.setJobName("Beam Job Spark"+new Random().nextFloat());
> System.out.println("App Name:"+options.getAppName());
> System.out.println("Job Name:"+options.getJobName());
> options.setMaxRecordsPerBatch(10L);
> 
> //  PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> 
> //  Duration size = Duration.standardMinutes(4);
> long duration = 60;
> if(args!=null && args.length==1){
> duration = Integer.valueOf(args[0]);
> }
> Duration size = Duration.standardSeconds(duration);
> System.out.println("时间窗口为:["+duration+"]秒");
> Window.Bound> fixWindow = 
> Window.> into(
> FixedWindows.of(size)
> );
> 
> String kafkaAddress = "10.100.124.208:9093";
> //  String kafkaAddress = "192.168.100.212:9092";
> 
> Map kfConsunmerConf = new HashMap();
> kfConsunmerConf.put("auto.offset.reset", "latest");
> PCollection kafkaJsonPc = p.apply(KafkaIO. 
> read()
> .withBootstrapServers(kafkaAddress)
> .withTopics(ImmutableList.of("wypxx1"))
> .withKeyCoder(StringUtf8Coder.of()) 
> .withValueCoder(StringUtf8Coder.of())
> .updateConsumerProperties(kfConsunmerConf)
> .withoutMetadata() 
> ).apply(Values. create());
> 
> 
> PCollection> totalPc = kafkaJsonPc.apply(
> "count line",
> ParDo.of(new DoFn>() {
> @ProcessElement
>   public void processElement(ProcessContext c) {
> String line = c.element();
> Instant is = c.timestamp();
> if(line.length()>2)
>   line = line.substring(0,2);
> System.out.println(line + " " +  is.toString());
> c.output(KV.of(line, line));
>   }
>  })
> );
> 
> 
> PCollection>> itPc = 
> totalPc.apply(fixWindow).apply(
> "group by appKey",
> GroupByKey.create()
> );
>   itPc.apply(ParDo.of(new DoFn>, Void>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV> keyIt = c.element();
> String key = keyIt.getKey();
> Iterable itb = keyIt.getValue();
> Iterator it = itb.iterator();
> StringBuilder sb = new StringBuilder();
> sb.append(key).append(":[");
> while(it.hasNext()){
> sb.append(it.next()).append(",");
> }
> String str = sb.toString();
> str = str.substring(0,str.length() -1) + "]";
> System.out.println(str);
> String filePath = "/data/wyp/sparktest.txt";
> String line = "word-->["+key+"]total 
> count="+str+"--->time+"+c.timestamp().toString();
> System.out.println("writefile->"+line);
> FileUtil.write(filePath, line, true, true);
> }
> 
>  }));
>   
> p.run().waitUn

[jira] [Commented] (BEAM-1775) fix issue of start_from_previous_offset in KafkaIO

2017-03-22 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936210#comment-15936210
 ] 

Amit Sela commented on BEAM-1775:
-

[~mingmxu] so you want this to be a "fallback" in case runners choose not to 
checkpoint ? I believe that runners shouldn't/can't make this choice and say 
they support reading from unbounded sources.

> fix issue of start_from_previous_offset in KafkaIO
> --
>
> Key: BEAM-1775
> URL: https://issues.apache.org/jira/browse/BEAM-1775
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>
> Jins George jins.geo...@aeris.net via aermail.onmicrosoft.com 
>   
> 5:50 PM (15 hours ago)
>   
> to user
> Hello,
> I am writing a Beam pipeline(streaming) with Flink runner to consume data 
> from Kafka and apply some transformations and persist to Hbase.
> If I restart the application ( due to failure/manual restart), consumer does 
> not resume from the offset where it was prior to restart. It always resume 
> from the latest offset.
> If I enable Flink checkpionting with hdfs state back-end, system appears to 
> be resuming from the earliest offset
> Is there a recommended way to resume from the offset where it was stopped ?
> Thanks,
> Jins George



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1765) Remove Aggregators from Spark runner

2017-03-21 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935311#comment-15935311
 ] 

Amit Sela commented on BEAM-1765:
-

I'm wondering what's the timeline here ? last I checked direct and Spark 
runners were the only runners to support {{Metrics}}. 

> Remove Aggregators from Spark runner
> 
>
> Key: BEAM-1765
> URL: https://issues.apache.org/jira/browse/BEAM-1765
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Pablo Estrada
>Assignee: Amit Sela
>
> I have started removing aggregators from the Java SDK, but runners use them 
> in different ways that I can't figure out well. This is to track the 
> independent effort in Spark.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1775) fix issue of start_from_previous_offset in KafkaIO

2017-03-21 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935305#comment-15935305
 ] 

Amit Sela commented on BEAM-1775:
-

I think this is might be a Flink runner specific issue (or just a missing/bad 
configuration), as I believe that both the Dataflow runner and the Spark runner 
will resume from the latest {{CheckpointMark}} upon restart.  

> fix issue of start_from_previous_offset in KafkaIO
> --
>
> Key: BEAM-1775
> URL: https://issues.apache.org/jira/browse/BEAM-1775
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Xu Mingmin
>Assignee: Xu Mingmin
>
> Jins George jins.geo...@aeris.net via aermail.onmicrosoft.com 
>   
> 5:50 PM (15 hours ago)
>   
> to user
> Hello,
> I am writing a Beam pipeline(streaming) with Flink runner to consume data 
> from Kafka and apply some transformations and persist to Hbase.
> If I restart the application ( due to failure/manual restart), consumer does 
> not resume from the offset where it was prior to restart. It always resume 
> from the latest offset.
> If I enable Flink checkpionting with hdfs state back-end, system appears to 
> be resuming from the earliest offset
> Is there a recommended way to resume from the offset where it was stopped ?
> Thanks,
> Jins George



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-507) Fill in the documentation/runners/spark portion of the website

2017-03-20 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15933238#comment-15933238
 ] 

Amit Sela commented on BEAM-507:


Yup, thanks for the GC ;)

> Fill in the documentation/runners/spark portion of the website
> --
>
> Key: BEAM-507
> URL: https://issues.apache.org/jira/browse/BEAM-507
> Project: Beam
>  Issue Type: Bug
>  Components: website
>Reporter: Frances Perry
>Assignee: Amit Sela
> Fix For: Not applicable
>
>
> As per 
> https://docs.google.com/document/d/1-0jMv7NnYp0Ttt4voulUMwVe_qjBYeNMLm2LusYF3gQ/edit.
> Should be a landing page for Spark-specific information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-507) Fill in the documentation/runners/spark portion of the website

2017-03-20 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-507.

   Resolution: Fixed
Fix Version/s: Not applicable

> Fill in the documentation/runners/spark portion of the website
> --
>
> Key: BEAM-507
> URL: https://issues.apache.org/jira/browse/BEAM-507
> Project: Beam
>  Issue Type: Bug
>  Components: website
>Reporter: Frances Perry
>Assignee: Amit Sela
> Fix For: Not applicable
>
>
> As per 
> https://docs.google.com/document/d/1-0jMv7NnYp0Ttt4voulUMwVe_qjBYeNMLm2LusYF3gQ/edit.
> Should be a landing page for Spark-specific information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-19 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931859#comment-15931859
 ] 

Amit Sela commented on BEAM-1582:
-

Moved tests that use checkpoint recovery to post-commit. Keeping open since the 
problem is still there.

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1752) Tag Spark runner tests that recover from checkpoint.

2017-03-19 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1752.
-
   Resolution: Fixed
Fix Version/s: First stable release

> Tag Spark runner tests that recover from checkpoint.
> 
>
> Key: BEAM-1752
> URL: https://issues.apache.org/jira/browse/BEAM-1752
> Project: Beam
>  Issue Type: Test
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> Add a tag to tag tests that validate using recovery from checkpoint.
> This could help running such tests as part of a profile (e.g., PostCommit) 
> since they tend to flake because of Spark related issues. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1752) Tag Spark runner tests that recover from checkpoint.

2017-03-19 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1752:
---

 Summary: Tag Spark runner tests that recover from checkpoint.
 Key: BEAM-1752
 URL: https://issues.apache.org/jira/browse/BEAM-1752
 Project: Beam
  Issue Type: Test
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


Add a tag to tag tests that validate using recovery from checkpoint.
This could help running such tests as part of a profile (e.g., PostCommit) 
since they tend to flake because of Spark related issues. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-17 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861
 ] 

Amit Sela edited comment on BEAM-1582 at 3/17/17 10:21 PM:
---

Could be related to SPARK-14701 and/or SPARK-14930 so that the 
{{CheckpointMark}} is not properly checkpointed.
If for some reason the runtime environment was so slow it failed to start 
execution until timeout was hit, graceful stop would force to at least finish 
the first batch, and if this first batch included the read from Kafka on one 
hand, while failing to checkpoint the {{Reader}} mark on the other, resuming 
from checkpoint would read all the Kafka back log again causing the failures we 
see.

I'll have a look at failed tests execution time to figure out if that seems to 
be the case, and if so I will simply move this test to post commit because This 
issue in Spark was only resolved for v2.0


was (Author: amitsela):
Could be related to SPARK-14701 and/or SPARK-14930 so that the last 
{{CheckpointMark}} is not properly checkpointed.
If for some reason the runtime environment was so slow it failed to start 
execution until timeout was hit, graceful stop would force to at least finish 
the first batch, and if this first batch included the read from Kafka on one 
hand, while failing to checkpoint the {{Reader}} mark on the other, resuming 
from checkpoint would read all the Kafka back log again causing the failures we 
see.

I'll have a look at failed tests execution time to figure out if that seems to 
be the case, and if so I will simply move this test to post commit because This 
issue in Spark was only resolved for v2.0

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-17 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861
 ] 

Amit Sela edited comment on BEAM-1582 at 3/17/17 10:21 PM:
---

Could be related to SPARK-14701 and/or SPARK-14930 so that the last 
{{CheckpointMark}} is not properly checkpointed.
If for some reason the runtime environment was so slow it failed to start 
execution until timeout was hit, graceful stop would force to at least finish 
the first batch, and if this first batch included the read from Kafka on one 
hand, while failing to checkpoint the {{Reader}} mark on the other, resuming 
from checkpoint would read all the Kafka back log again causing the failures we 
see.

I'll have a look at failed tests execution time to figure out if that seems to 
be the case, and if so I will simply move this test to post commit because This 
issue in Spark was only resolved for v2.0


was (Author: amitsela):
Could be related to SPARK-14701 so that the last {{CheckpointMark}} is not 
properly checkpointed.
If for some reason the runtime environment was so slow it failed to start 
execution until timeout was hit, graceful stop would force to at least finish 
the first batch, and if this first batch included the read from Kafka on one 
hand, while failing to checkpoint the {{Reader}} mark on the other, resuming 
from checkpoint would read all the Kafka back log again causing the failures we 
see.

I'll have a look at failed tests execution time to figure out if that seems to 
be the case, and if so I will simply move this test to post commit because This 
issue in Spark was only resolved for v2.0

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-17 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861
 ] 

Amit Sela edited comment on BEAM-1582 at 3/17/17 10:19 PM:
---

Could be related to SPARK-14701 so that the last {{CheckpointMark}} is not 
properly checkpointed.
If for some reason the runtime environment was so slow it failed to start 
execution until timeout was hit, graceful stop would force to at least finish 
the first batch, and if this first batch included the read from Kafka on one 
hand, while failing to checkpoint the {{Reader}} mark on the other, resuming 
from checkpoint would read all the Kafka back log again causing the failures we 
see.

I'll have a look at failed tests execution time to figure out if that seems to 
be the case, and if so I will simply move this test to post commit because This 
issue in Spark was only resolved for v2.0


was (Author: amitsela):
Could be related to SPARK-16480 so that the last {{CheckpointMark}} is not 
properly checkpointed.
If for some reason the runtime environment was so slow it failed to start 
execution until timeout was hit, graceful stop would force to at least finish 
the first batch, and if this first batch included the read from Kafka on one 
hand, while failing to checkpoint the {{Reader}} mark on the other, resuming 
from checkpoint would read all the Kafka back log again causing the failures we 
see.

I'll have a look at failed tests execution time to figure out if that seems to 
be the case, and if so I will simply move this test to post commit because This 
issue in Spark was only resolved for v2.0

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-17 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861
 ] 

Amit Sela edited comment on BEAM-1582 at 3/17/17 10:14 PM:
---

Could be related to SPARK-16480 so that the last {{CheckpointMark}} is not 
properly checkpointed.
If for some reason the runtime environment was so slow it failed to start 
execution until timeout was hit, graceful stop would force to at least finish 
the first batch, and if this first batch included the read from Kafka on one 
hand, while failing to checkpoint the {{Reader}} mark on the other, resuming 
from checkpoint would read all the Kafka back log again causing the failures we 
see.

I'll have a look at failed tests execution time to figure out if that seems to 
be the case, and if so I will simply move this test to post commit because This 
issue in Spark was only resolved for v2.0


was (Author: amitsela):
Could be related to SPARK-16480 so that the last {{CheckpointMark}} is not 
properly checkpointed.

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once

2017-03-14 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924819#comment-15924819
 ] 

Amit Sela commented on BEAM-1717:
-

Once this worked-out, I noticed a duplicate {{test}} jar in 
{{beam-sdks-java-core}}.
Not sure what's the issue here, but running maven in debug mode shows that in 
the execution of {{maven-install-plugin}} the {{attachedArtifacts}} contain 
both {{org.apache.beam:beam-sdks-java-core:test-jar:tests}} and 
{{org.apache.beam:beam-sdks-java-core:jar:tests}} which seem to cause the 
duplication.
While the {{default-test-jar}} execution id (jar plugin) calls the {{test-jar}} 
goal, I wonder if the {{jar}} goal also creates a {{tests}} jar. 

> Maven release/deploy tries to uploads some artifacts more than once 
> 
>
> Key: BEAM-1717
> URL: https://issues.apache.org/jira/browse/BEAM-1717
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
>
> Running maven {{release}} or {{deploy}} causes some artifacts to deploy more 
> than once which fails deployments to release Nexus.
> While this is not an issue for the Apache release process (because it uses a 
> staging Nexus), this affects users who wish to deploy their own fork. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once

2017-03-14 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924812#comment-15924812
 ] 

Amit Sela commented on BEAM-1717:
-

Seems that there is more than one issue here.

First one is with {{beam-sdks-common-fn-api}} which produces the {{javadoc}} 
jar twice.
This seemed to be resolved by changing the execution id of 
{{maven-javadoc-plugin}} in {{beam-parent.xml}} from {{javadoc}} to 
{{attach-javadocs}} like this:

{code}
...

  release
  

  

  org.apache.maven.plugins
  maven-javadoc-plugin
  

  attach-javadocs
  package
  
jar
  

  

...
{code}

> Maven release/deploy tries to uploads some artifacts more than once 
> 
>
> Key: BEAM-1717
> URL: https://issues.apache.org/jira/browse/BEAM-1717
> Project: Beam
>  Issue Type: Bug
>  Components: build-system
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
>
> Running maven {{release}} or {{deploy}} causes some artifacts to deploy more 
> than once which fails deployments to release Nexus.
> While this is not an issue for the Apache release process (because it uses a 
> staging Nexus), this affects users who wish to deploy their own fork. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once

2017-03-14 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1717:
---

 Summary: Maven release/deploy tries to uploads some artifacts more 
than once 
 Key: BEAM-1717
 URL: https://issues.apache.org/jira/browse/BEAM-1717
 Project: Beam
  Issue Type: Bug
  Components: build-system
Reporter: Amit Sela
Assignee: Amit Sela
Priority: Minor


Running maven {{release}} or {{deploy}} causes some artifacts to deploy more 
than once which fails deployments to release Nexus.

While this is not an issue for the Apache release process (because it uses a 
staging Nexus), this affects users who wish to deploy their own fork. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Reopened] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-12 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela reopened BEAM-1582:
-

This test keeps flaking so I'll leave it open until we resolve it, or move to 
PostCommit and accept the fact that it flakes at times.

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-797) A PipelineVisitor that creates a Spark-native pipeline.

2017-03-10 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-797.

   Resolution: Fixed
Fix Version/s: First stable release

> A PipelineVisitor that creates a Spark-native pipeline. 
> 
>
> Key: BEAM-797
> URL: https://issues.apache.org/jira/browse/BEAM-797
> Project: Beam
>  Issue Type: Wish
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
>Priority: Minor
> Fix For: First stable release
>
>
> It could be very useful for debugging purposes to have a custom 
> PipelineVisitor that can tell what's the underlying Spark code that is being 
> called.
> One idea:
> This could be called with a flag in SparkPipelineOptions and instead of 
> executing the pipeline, it would print the underlying Spark DAG.
> Clearly, DoFn internals would be obfuscated, but the Spark code could note 
> {{mapPartitions("ExtractWords")}}
> Another difference would be Sources as they are a custom implementation for 
> Beam.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1562) Use a "signal" to stop streaming tests as they finish.

2017-03-09 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1562.
-
   Resolution: Fixed
Fix Version/s: First stable release

> Use a "signal" to stop streaming tests as they finish.
> --
>
> Key: BEAM-1562
> URL: https://issues.apache.org/jira/browse/BEAM-1562
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> Streaming tests use a timeout that has to take a large enough buffer to avoid 
> slow runtimes stopping before test completes.
> We can introduce a "poison pill" based on an {{EndOfStream}} element that 
> would be counted in a Metrics/Aggregator to know all data was processed.
> Another option would be to follow the slowest WM and stop once it hits 
> end-of-time.
> This requires the Spark runner to stop blocking the execution of a streaming 
> pipeline until it's complete - which is something relevant to 
> {{PipelineResult}} (re)design that is being discussed in BEAM-849   



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-09 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1582.
-
   Resolution: Fixed
Fix Version/s: First stable release

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: First stable release
>
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1636) UnboundedDataset action() does not materialize RDD

2017-03-07 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1636.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> UnboundedDataset action() does not materialize RDD
> --
>
> Key: BEAM-1636
> URL: https://issues.apache.org/jira/browse/BEAM-1636
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
> Fix For: 0.6.0
>
>
> {{UnboundedDataset#action}} does not materialize DStream since it uses 
> {{register}}. Instead use {{foreachRDD}}...{{foreach}} which is an action, 
> which will materialize the RDDs of the DStream.
> The reason it worked until now was because there was also a call to 
> {{cache()}} before it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1556) Spark executors need to register IO factories

2017-03-07 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1556.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> Spark executors need to register IO factories
> -
>
> Key: BEAM-1556
> URL: https://issues.apache.org/jira/browse/BEAM-1556
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Frances Perry
>Assignee: Amit Sela
> Fix For: 0.6.0
>
>
> The Spark executors need to call IOChannelUtils.registerIOFactories(options) 
> in order to support GCS file and make the default WordCount example work.
> Context in this thread: 
> https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1623) Transform Reshuffle directly in Spark runner

2017-03-06 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1623.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> Transform Reshuffle directly in Spark runner
> 
>
> Key: BEAM-1623
> URL: https://issues.apache.org/jira/browse/BEAM-1623
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
> Fix For: 0.6.0
>
>
> Transform {{Reshuffle}} directly in Spark runner.
> Spark's {{repartition}} is logically equivalent to {{Reshuffle}} and will 
> cause the required fusion break without incurring the overhead of 
> {{groupByKey}} (In streaming pipelines this will use {{updateStateByKey}}) 
> compared to a simple {{repartition}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1626) Remove caching of read MapWithStateDStream.

2017-03-06 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1626.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> Remove caching of read MapWithStateDStream.
> ---
>
> Key: BEAM-1626
> URL: https://issues.apache.org/jira/browse/BEAM-1626
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: 0.6.0
>
>
> There's no real need for it since checkpointing caches as well, and from my 
> experiments I think it also has something to do with some of the flakes in 
> streaming tests.
> Anyway, I don't see a good reason to call {{cache()}} there, so let's remove 
> it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1565) Update Spark runner PostCommit Jenkins job.

2017-03-05 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1565.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> Update Spark runner PostCommit Jenkins job.
> ---
>
> Key: BEAM-1565
> URL: https://issues.apache.org/jira/browse/BEAM-1565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Aviem Zur
>Priority: Minor
>  Labels: beginner, low-hanging-fruit
> Fix For: 0.6.0
>
>
> Should only activate {{runnable-on-service}} profile (and 
> {{streaming-runnable-on-service}} once enabled) and remove 
> {{-Dspark.port.maxRetries=64}}. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1556) Spark executors need to register IO factories

2017-03-05 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela reassigned BEAM-1556:
---

Assignee: Amit Sela  (was: Jean-Baptiste Onofré)

> Spark executors need to register IO factories
> -
>
> Key: BEAM-1556
> URL: https://issues.apache.org/jira/browse/BEAM-1556
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Frances Perry
>Assignee: Amit Sela
>
> The Spark executors need to call IOChannelUtils.registerIOFactories(options) 
> in order to support GCS file and make the default WordCount example work.
> Context in this thread: 
> https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1626) Remove caching of read MapWithStateDStream.

2017-03-05 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1626:
---

 Summary: Remove caching of read MapWithStateDStream.
 Key: BEAM-1626
 URL: https://issues.apache.org/jira/browse/BEAM-1626
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


There's no real need for it since checkpointing caches as well, and from my 
experiments I think it also has something to do with some of the flakes in 
streaming tests.
Anyway, I don't see a good reason to call {{cache()}} there, so let's remove it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1625) BoundedDataset action() does not materialize RDD

2017-03-05 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1625.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> BoundedDataset action() does not materialize RDD
> 
>
> Key: BEAM-1625
> URL: https://issues.apache.org/jira/browse/BEAM-1625
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
> Fix For: 0.6.0
>
>
> {{BoundedDataset#action}} does not materialize RDD since it uses 
> {{foreachPartition}} which is not an action, instead use {{foreach}} which is.
> See: http://spark.apache.org/docs/latest/programming-guide.html#actions



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1608) Add support for Spark cluster metrics to PKB

2017-03-04 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela reassigned BEAM-1608:
---

Assignee: (was: Amit Sela)

> Add support for Spark cluster metrics to PKB
> 
>
> Key: BEAM-1608
> URL: https://issues.apache.org/jira/browse/BEAM-1608
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark, testing
>Reporter: Jason Kuster
>
> See 
> https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73#heading=h.exn0s6jsm24q
>  for more details on what this entails. 
> Blocked on BEAM-1602, adding support for Spark to PKB.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1602) Spark Runner support for PerfKit Benchmarker

2017-03-04 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela reassigned BEAM-1602:
---

Assignee: (was: Amit Sela)

> Spark Runner support for PerfKit Benchmarker
> 
>
> Key: BEAM-1602
> URL: https://issues.apache.org/jira/browse/BEAM-1602
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark, testing
>Reporter: Jason Kuster
>
> See 
> https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73#heading=h.exn0s6jsm24q
>  for more details on what this entails. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1602) Spark Runner support for PerfKit Benchmarker

2017-03-04 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895863#comment-15895863
 ] 

Amit Sela commented on BEAM-1602:
-

[~jasonkuster] what is Spark missing here ? running an IT against cluster ? why 
not use `spark-submit` ? or does it have to be programatically ?

> Spark Runner support for PerfKit Benchmarker
> 
>
> Key: BEAM-1602
> URL: https://issues.apache.org/jira/browse/BEAM-1602
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark, testing
>Reporter: Jason Kuster
>Assignee: Amit Sela
>
> See 
> https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73#heading=h.exn0s6jsm24q
>  for more details on what this entails. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-04 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861
 ] 

Amit Sela commented on BEAM-1582:
-

Could be related to SPARK-16480 so that the last {{CheckpointMark}} is not 
properly checkpointed.

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1591) Implement Combine optimizations for GABW in streaming.

2017-03-02 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1591:
---

 Summary: Implement Combine optimizations for GABW in streaming.
 Key: BEAM-1591
 URL: https://issues.apache.org/jira/browse/BEAM-1591
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Amit Sela


This should be straight-forward.
Introduce {{AccumT}} generics in {{SparkGroupAlsoByWindowViaWindowSet}} and 
call with {{InputT}} for GBK and {{AccumT}} with Combine.
Pass the proper {{SystemReduceFn}} instead of creating it in 
{{SparkGroupAlsoByWindowViaWindowSet}}.
For combine, extract the output from the fired accumulated output. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-673) Data locality for Read.Bounded

2017-03-02 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-673:
---
Description: 
In some distributed filesystems, such as HDFS, we should be able to hint to 
Spark the preferred locations of splits.
Here is an example of how Spark does that for Hadoop RDDs:
https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L249

  was:
In some distributed filesystems, such as HDFS, we should be able to hint to 
Spark the preferred locations of splits.
Here is an example of how Spark does that for Hadoop RDDs:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L252

*Note: in case of 1-to-1 mapping of Read operation (e.g. TextIO) direct 
translation should still be preferred, but this is pending HDFS support for 
Beam anyway.*


> Data locality for Read.Bounded
> --
>
> Key: BEAM-673
> URL: https://issues.apache.org/jira/browse/BEAM-673
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
> Fix For: First stable release
>
>
> In some distributed filesystems, such as HDFS, we should be able to hint to 
> Spark the preferred locations of splits.
> Here is an example of how Spark does that for Hadoop RDDs:
> https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L249



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-01 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891188#comment-15891188
 ] 

Amit Sela commented on BEAM-1582:
-

Looks like the flake happens when the entire input is re-read.
We inject 4 elements to Kafka before the first run, and 2 more before the 
second. When all is well, printing the number of elements read by 
SparkUnboundedSource's {{readUnboundedStream}} JavaDStream says 4 (sometimes 1 
followed by 3) in the first run, and 2 in the second, but in failures, it reads 
6 in the second.
This would happen if the checkpoint of the readers are not persisted for some 
reason causing the KafkaIO to use the default "earliest" and so read everything.
This happens even though checkpoint interval is batch interval. I will check if 
there's a way to guarantee/block on checkpointing.

> ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
> --
>
> Key: BEAM-1582
> URL: https://issues.apache.org/jira/browse/BEAM-1582
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> See: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/
> After some digging in it appears that a second firing occurs (though only one 
> is expected) but it doesn't come from a stale state (state is empty before it 
> fires).
> Might be a retry happening for some reason, which is OK in terms of 
> fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
> tests. 
> I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.

2017-03-01 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1582:
---

 Summary: ResumeFromCheckpointStreamingTest flakes with what 
appears as a second firing.
 Key: BEAM-1582
 URL: https://issues.apache.org/jira/browse/BEAM-1582
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


See: 
https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/

After some digging in it appears that a second firing occurs (though only one 
is expected) but it doesn't come from a stale state (state is empty before it 
fires).
Might be a retry happening for some reason, which is OK in terms of 
fault-tolerance guarantees (at-least-once), but not so much in terms of flaky 
tests. 

I'm looking into this hoping to fix this ASAP.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-111) Use SDK implementation of WritableCoder

2017-03-01 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-111.

   Resolution: Fixed
Fix Version/s: 0.6.0

> Use SDK implementation of WritableCoder
> ---
>
> Key: BEAM-111
> URL: https://issues.apache.org/jira/browse/BEAM-111
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Ismaël Mejía
>  Labels: easy, starter
> Fix For: 0.6.0
>
>
> The Spark runner currently uses it's own implementation of WritableCoder, 
> should use the one in {{io-hdfs}}.
> Remove {{org.apache.beam.runners.spark.coders.WritableCoder}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-849) Redesign PipelineResult API

2017-03-01 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890953#comment-15890953
 ] 

Amit Sela edited comment on BEAM-849 at 3/1/17 8:14 PM:


A continually growing file is something else, which I agree on, but in that 
case {{waitUntilFinish()}} would terminate after the file is done, right ? by 
moving the WM to end-of-time ? so if that happens implicitly, why call 
explicitly ?

All I'm saying is that in batch, beginning and end are known ahead of 
execution, so blocking until termination is natural. In streaming however, the 
end is unknown, so it's a bit awkward - some pipelines will behave like batch, 
like SDF-log-tail, and some won't like reading from Pubsub/Kafka.
I will agree that for the sake of a unified model it makes sense, but still a 
bit un-natural, so that's why I think this ticket is for - to try and reason 
about this and make "feel" more natural, no ? 

As for "unbounded pipelines" not being a part of the model, it's a bit 
confusing because it's all over the SDK.



was (Author: amitsela):
A continually growing file is something else, which I agree on, but in that 
case {{waitUntilFinish()}} would terminate after the file is done, right ? by 
moving the WM to end-of-time ? so if that happens implicitly, why call 
explicitly ?

All I'm saying is that in batch, beginning and end are known ahead of 
execution, so blocking until termination is natural. In streaming however, the 
end is unknown, so it's a bit awkward - some pipelines will behave the same, 
like SDF-log-tail, and some won't like reading from Pubsub/Kafka.
I will agree that for the sake of a unified model it makes sense, but still a 
bit un-natural, so that's why I think this ticket is for - to try and reason 
about this and make "feel" more natural, no ? 

As for "unbounded pipelines" not being a part of the model, it's a bit 
confusing because it's all over the SDK.


> Redesign PipelineResult API
> ---
>
> Key: BEAM-849
> URL: https://issues.apache.org/jira/browse/BEAM-849
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Pei He
>
> Current state: 
> Jira https://issues.apache.org/jira/browse/BEAM-443 addresses 
> waitUntilFinish() and cancel(). 
> However, there are additional work around PipelineResult: 
> need clearly defined contract and verification across all runners 
> need to revisit how to handle metrics/aggregators 
> need to be able to get logs



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-849) Redesign PipelineResult API

2017-03-01 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890953#comment-15890953
 ] 

Amit Sela commented on BEAM-849:


A continually growing file is something else, which I agree on, but in that 
case {{waitUntilFinish()}} would terminate after the file is done, right ? by 
moving the WM to end-of-time ? so if that happens implicitly, why call 
explicitly ?

All I'm saying is that in batch, beginning and end are known ahead of 
execution, so blocking until termination is natural. In streaming however, the 
end is unknown, so it's a bit awkward - some pipelines will behave the same, 
like SDF-log-tail, and some won't like reading from Pubsub/Kafka.
I will agree that for the sake of a unified model it makes sense, but still a 
bit un-natural, so that's why I think this ticket is for - to try and reason 
about this and make "feel" more natural, no ? 

As for "unbounded pipelines" not being a part of the model, it's a bit 
confusing because it's all over the SDK.


> Redesign PipelineResult API
> ---
>
> Key: BEAM-849
> URL: https://issues.apache.org/jira/browse/BEAM-849
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Pei He
>
> Current state: 
> Jira https://issues.apache.org/jira/browse/BEAM-443 addresses 
> waitUntilFinish() and cancel(). 
> However, there are additional work around PipelineResult: 
> need clearly defined contract and verification across all runners 
> need to revisit how to handle metrics/aggregators 
> need to be able to get logs



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-849) Redesign PipelineResult API

2017-03-01 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890885#comment-15890885
 ] 

Amit Sela edited comment on BEAM-849 at 3/1/17 7:35 PM:


I disagree with stating that "Create.of(filename) + ParDo(tail file) + 
ParDo(process records)" in streaming leverages "low-latency", that's very 
runner specific, and generally inaccurate - how is sending a "filename" to 
worker/s + data locality slower than streaming the file to processors ? 


was (Author: amitsela):
I disagree with stating that "Create.of(filename) + ParDo(tail file) + 
ParDo(process records)" in streaming leverages "low-latency", that's very 
runner specific, and generally inaccurate - how is sending a "filename" to 
worker/s + data locality faster than streaming the file to processors ? 

> Redesign PipelineResult API
> ---
>
> Key: BEAM-849
> URL: https://issues.apache.org/jira/browse/BEAM-849
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Pei He
>
> Current state: 
> Jira https://issues.apache.org/jira/browse/BEAM-443 addresses 
> waitUntilFinish() and cancel(). 
> However, there are additional work around PipelineResult: 
> need clearly defined contract and verification across all runners 
> need to revisit how to handle metrics/aggregators 
> need to be able to get logs



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1562) Use a "signal" to stop streaming tests as they finish.

2017-03-01 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890889#comment-15890889
 ] 

Amit Sela commented on BEAM-1562:
-

I am using Watermarks ;-) but I persisted things to give all the information.

> Use a "signal" to stop streaming tests as they finish.
> --
>
> Key: BEAM-1562
> URL: https://issues.apache.org/jira/browse/BEAM-1562
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> Streaming tests use a timeout that has to take a large enough buffer to avoid 
> slow runtimes stopping before test completes.
> We can introduce a "poison pill" based on an {{EndOfStream}} element that 
> would be counted in a Metrics/Aggregator to know all data was processed.
> Another option would be to follow the slowest WM and stop once it hits 
> end-of-time.
> This requires the Spark runner to stop blocking the execution of a streaming 
> pipeline until it's complete - which is something relevant to 
> {{PipelineResult}} (re)design that is being discussed in BEAM-849   



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-849) Redesign PipelineResult API

2017-03-01 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890885#comment-15890885
 ] 

Amit Sela commented on BEAM-849:


I disagree with stating that "Create.of(filename) + ParDo(tail file) + 
ParDo(process records)" in streaming leverages "low-latency", that's very 
runner specific, and generally inaccurate - how is sending a "filename" to 
worker/s + data locality faster than streaming the file to processors ? 

> Redesign PipelineResult API
> ---
>
> Key: BEAM-849
> URL: https://issues.apache.org/jira/browse/BEAM-849
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Pei He
>
> Current state: 
> Jira https://issues.apache.org/jira/browse/BEAM-443 addresses 
> waitUntilFinish() and cancel(). 
> However, there are additional work around PipelineResult: 
> need clearly defined contract and verification across all runners 
> need to revisit how to handle metrics/aggregators 
> need to be able to get logs



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1562) Use a "signal" to stop streaming tests as they finish.

2017-03-01 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-1562:

Description: 
Streaming tests use a timeout that has to take a large enough buffer to avoid 
slow runtimes stopping before test completes.

We can introduce a "poison pill" based on an {{EndOfStream}} element that would 
be counted in a Metrics/Aggregator to know all data was processed.

Another option would be to follow the slowest WM and stop once it hits 
end-of-time.

This requires the Spark runner to stop blocking the execution of a streaming 
pipeline until it's complete - which is something relevant to 
{{PipelineResult}} (re)design that is being discussed in BEAM-849   

  was:
Streaming tests use a timeout that has to take a large enough buffer to avoid 
slow runtimes stopping before test completes.

We can introduce a "poison pill" based on an {{EndOfStream}} element that would 
be counted in a Metrics/Aggregator to know all data was processed.

This requires the Spark runner to stop blocking the execution of a streaming 
pipeline until it's complete - which is something relevant to 
{{PipelineResult}} (re)design that is being discussed in BEAM-849   


> Use a "signal" to stop streaming tests as they finish.
> --
>
> Key: BEAM-1562
> URL: https://issues.apache.org/jira/browse/BEAM-1562
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> Streaming tests use a timeout that has to take a large enough buffer to avoid 
> slow runtimes stopping before test completes.
> We can introduce a "poison pill" based on an {{EndOfStream}} element that 
> would be counted in a Metrics/Aggregator to know all data was processed.
> Another option would be to follow the slowest WM and stop once it hits 
> end-of-time.
> This requires the Spark runner to stop blocking the execution of a streaming 
> pipeline until it's complete - which is something relevant to 
> {{PipelineResult}} (re)design that is being discussed in BEAM-849   



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1562) Use a "signal" to stop streaming tests as they finish.

2017-03-01 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-1562:

Summary: Use a "signal" to stop streaming tests as they finish.  (was: Use 
a "poison pill" to stop streaming tests as they finish.)

> Use a "signal" to stop streaming tests as they finish.
> --
>
> Key: BEAM-1562
> URL: https://issues.apache.org/jira/browse/BEAM-1562
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> Streaming tests use a timeout that has to take a large enough buffer to avoid 
> slow runtimes stopping before test completes.
> We can introduce a "poison pill" based on an {{EndOfStream}} element that 
> would be counted in a Metrics/Aggregator to know all data was processed.
> This requires the Spark runner to stop blocking the execution of a streaming 
> pipeline until it's complete - which is something relevant to 
> {{PipelineResult}} (re)design that is being discussed in BEAM-849   



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1562) Use a "poison pill" to stop streaming tests as they finish.

2017-03-01 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela reassigned BEAM-1562:
---

Assignee: Amit Sela

> Use a "poison pill" to stop streaming tests as they finish.
> ---
>
> Key: BEAM-1562
> URL: https://issues.apache.org/jira/browse/BEAM-1562
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> Streaming tests use a timeout that has to take a large enough buffer to avoid 
> slow runtimes stopping before test completes.
> We can introduce a "poison pill" based on an {{EndOfStream}} element that 
> would be counted in a Metrics/Aggregator to know all data was processed.
> This requires the Spark runner to stop blocking the execution of a streaming 
> pipeline until it's complete - which is something relevant to 
> {{PipelineResult}} (re)design that is being discussed in BEAM-849   



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1576) Use UnsupportedSideInputReader in GroupAlsoByWindowEvaluatorFactory.

2017-03-01 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1576.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> Use UnsupportedSideInputReader in GroupAlsoByWindowEvaluatorFactory.
> 
>
> Key: BEAM-1576
> URL: https://issues.apache.org/jira/browse/BEAM-1576
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
> Fix For: 0.6.0
>
>
> Now that {{runners-core}} has an {{UnsupportedSideInputReader}} we can use it 
> in Direct runner too.
> I didn't see any other refactors that I could do around this across the 
> project so it's tagged on the direct runner only.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1576) Use UnsupportedSideInputReader in GroupAlsoByWindowEvaluatorFactory.

2017-02-28 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1576:
---

 Summary: Use UnsupportedSideInputReader in 
GroupAlsoByWindowEvaluatorFactory.
 Key: BEAM-1576
 URL: https://issues.apache.org/jira/browse/BEAM-1576
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Reporter: Amit Sela
Assignee: Amit Sela
Priority: Minor


Now that {{runners-core}} has an {{UnsupportedSideInputReader}} we can use it 
in Direct runner too.
I didn't see any other refactors that I could do around this across the project 
so it's tagged on the direct runner only.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-920) Support triggers, panes and watermarks.

2017-02-28 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-920.

   Resolution: Fixed
Fix Version/s: 0.6.0

> Support triggers, panes and watermarks.
> ---
>
> Key: BEAM-920
> URL: https://issues.apache.org/jira/browse/BEAM-920
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: 0.6.0
>
>
> Implement event-time based aggregation using triggers, panes and watermarks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1556) Spark executors need to register IO factories

2017-02-28 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888512#comment-15888512
 ] 

Amit Sela commented on BEAM-1556:
-

Agree, unblock users first.
[~jbonofre] I can help with this if you want, ping me. 

> Spark executors need to register IO factories
> -
>
> Key: BEAM-1556
> URL: https://issues.apache.org/jira/browse/BEAM-1556
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Frances Perry
>Assignee: Jean-Baptiste Onofré
>
> The Spark executors need to call IOChannelUtils.registerIOFactories(options) 
> in order to support GCS file and make the default WordCount example work.
> Context in this thread: 
> https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1556) Spark executors need to register IO factories

2017-02-28 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887964#comment-15887964
 ] 

Amit Sela commented on BEAM-1556:
-

My line of thought about this being in the SDK (or better, the Runner API) is 
because the runner would have to init. the registration for every instance, 
workers mostly (the implementation of {{PipelineRunner}} would probably take 
care of it for the "Driver" instance).
Since not all {{DoFn}} require this, and not all readers/writes.. so it's 
either init. all the time (regardless if needed or not) or the runner would 
have to patch-up for every new use case: Read, Write, DoFn...
I'm not sure I'm going to like the following suggestion (fighting with myself a 
bit here), but how about a {{FileSystemContext}} ? and the runner would have to 
initialize in it and pass it on to the SDK ?

Not sure here.. thoughts ?  

> Spark executors need to register IO factories
> -
>
> Key: BEAM-1556
> URL: https://issues.apache.org/jira/browse/BEAM-1556
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Frances Perry
>Assignee: Jean-Baptiste Onofré
>
> The Spark executors need to call IOChannelUtils.registerIOFactories(options) 
> in order to support GCS file and make the default WordCount example work.
> Context in this thread: 
> https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1565) Update Spark runner PostCommit Jenkins job.

2017-02-27 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1565:
---

 Summary: Update Spark runner PostCommit Jenkins job.
 Key: BEAM-1565
 URL: https://issues.apache.org/jira/browse/BEAM-1565
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Amit Sela
Priority: Minor


Should only activate {{runnable-on-service}} profile (and 
{{streaming-runnable-on-service}} once enabled) and remove 
{{-Dspark.port.maxRetries=64}}. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1562) Use a "poison pill" to stop streaming tests as they finish.

2017-02-27 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1562:
---

 Summary: Use a "poison pill" to stop streaming tests as they 
finish.
 Key: BEAM-1562
 URL: https://issues.apache.org/jira/browse/BEAM-1562
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Amit Sela


Streaming tests use a timeout that has to take a large enough buffer to avoid 
slow runtimes stopping before test completes.

We can introduce a "poison pill" based on an {{EndOfStream}} element that would 
be counted in a Metrics/Aggregator to know all data was processed.

This requires the Spark runner to stop blocking the execution of a streaming 
pipeline until it's complete - which is something relevant to 
{{PipelineResult}} (re)design that is being discussed in BEAM-849   



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1564) Support streaming side-inputs in the Spark runner.

2017-02-27 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1564:
---

 Summary: Support streaming side-inputs in the Spark runner.
 Key: BEAM-1564
 URL: https://issues.apache.org/jira/browse/BEAM-1564
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


Currently an unbounded PCollection can use side-inputs from a bounded 
PCollection but there's no proper support for streaming sources that update the 
side-input.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1563) Flatten Spark runner libraries.

2017-02-27 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1563:
---

 Summary: Flatten Spark runner libraries.
 Key: BEAM-1563
 URL: https://issues.apache.org/jira/browse/BEAM-1563
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


Flatten Spark runner libraries into a single package (or two) so that 
everything is private.
See [~kenn] comment: https://github.com/apache/beam/pull/2050



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1556) Spark executors need to register IO factories

2017-02-26 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15884602#comment-15884602
 ] 

Amit Sela commented on BEAM-1556:
-

[~frances] I wonder if this is an issue with other runners as well.. since I 
don't see any runner (apart from {{DataflowRunner}}) calling {{IOChannelUtils}} 
in their code.

As for the Spark runner, looks like {{SourceRDD.Bounded}} needs to initialize 
IO factory registration on the workers, using deserialized PipelineOptions as 
Frances mentioned. I wouldn't do this as part of 
{{SparkRuntimeContext#deserializePipelineOptions(String)}} since it is only 
used when reading from a FileBasedSource (writing to Sink).

I wonder if this intialization shouldn't be a part {{FileBasedReader}} 
construction ?

> Spark executors need to register IO factories
> -
>
> Key: BEAM-1556
> URL: https://issues.apache.org/jira/browse/BEAM-1556
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Frances Perry
>Assignee: Jean-Baptiste Onofré
>
> The Spark executors need to call IOChannelUtils.registerIOFactories(options) 
> in order to support GCS file and make the default WordCount example work.
> Context in this thread: 
> https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-982) Document spark configuration update

2017-02-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883673#comment-15883673
 ] 

Amit Sela commented on BEAM-982:


[~jbonofre] {{spark.serializer}} is already hard-coded into the runner, and 
takes highest precedence.
I don't believe {{spark.kryoserializer.buffer.max}} was related specifically to 
your problem.

> Document spark configuration update
> ---
>
> Key: BEAM-982
> URL: https://issues.apache.org/jira/browse/BEAM-982
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Jean-Baptiste Onofré
>Priority: Minor
>
> When using {{spark-submit}} to submit a pipeline to a spark cluster, I had to 
> change the {{conf/spark-defaults.conf}} with the following:
> {code}
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.kryoserializer.buffer.max 1024
> {code}
> I think it would make sense to add a note in the Spark README or website 
> about this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-982) Document spark configuration update

2017-02-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883673#comment-15883673
 ] 

Amit Sela edited comment on BEAM-982 at 2/24/17 10:49 PM:
--

[~jbonofre] {{spark.serializer}} is already hard-coded into the runner, and 
takes highest precedence.
I don't believe {{spark.kryoserializer.buffer.max}} was related specifically to 
your problem.

Any reason not to close this ?


was (Author: amitsela):
[~jbonofre] {{spark.serializer}} is already hard-coded into the runner, and 
takes highest precedence.
I don't believe {{spark.kryoserializer.buffer.max}} was related specifically to 
your problem.

> Document spark configuration update
> ---
>
> Key: BEAM-982
> URL: https://issues.apache.org/jira/browse/BEAM-982
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Jean-Baptiste Onofré
>Priority: Minor
>
> When using {{spark-submit}} to submit a pipeline to a spark cluster, I had to 
> change the {{conf/spark-defaults.conf}} with the following:
> {code}
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.kryoserializer.buffer.max 1024
> {code}
> I think it would make sense to add a note in the Spark README or website 
> about this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-229) Add support for additional Spark configuration via PipelineOptions

2017-02-24 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed BEAM-229.
--
   Resolution: Won't Fix
Fix Version/s: Not applicable

Spark configurations can be passed as system properties or via Spark 
configuration as part of the deployment / spark-default.conf.

I don't see a reason to mix it into {{PipelineOptions}}.

> Add support for additional Spark configuration via PipelineOptions
> --
>
> Key: BEAM-229
> URL: https://issues.apache.org/jira/browse/BEAM-229
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
> Fix For: Not applicable
>
>
> Provide support for at least some of the Spark configurations via 
> PipelineOptions - memory, cores, etc.
> Need to pass PipelineOptions to 
> https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
>  where the SparkConf is created. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-981) Not possible to directly submit a pipeline on spark cluster

2017-02-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883644#comment-15883644
 ] 

Amit Sela commented on BEAM-981:


[~jbonofre] any news on this one ?

> Not possible to directly submit a pipeline on spark cluster
> ---
>
> Key: BEAM-981
> URL: https://issues.apache.org/jira/browse/BEAM-981
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 0.3.0-incubating, 0.4.0
>Reporter: Jean-Baptiste Onofré
>
> It's not possible to directly run a pipeline on the spark runner (for 
> instance using {{mvn exec:java}}. It fails with:
> {code}
> [appclient-register-master-threadpool-0] INFO 
> org.apache.spark.deploy.client.AppClient$ClientEndpoint - Connecting to 
> master spark://10.200.118.197:7077...
> [shuffle-client-0] ERROR org.apache.spark.network.client.TransportClient - 
> Failed to send RPC 6813731522650020739 to /10.200.118.197:7077: 
> java.lang.AbstractMethodError: 
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> java.lang.AbstractMethodError: 
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
> at 
> io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
> at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:820)
> at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733)
> at 
> io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740)
> at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:826)
> at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733)
> at 
> io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740)
> at 
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
> at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1101)
> at 
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148)
> at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1090)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.safeExecute(SingleThreadEventExecutor.java:451)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
> at java.lang.Thread.run(Thread.java:745)
> [appclient-register-master-threadpool-0] WARN 
> org.apache.spark.deploy.client.AppClient$ClientEndpoint - Failed to connect 
> to master 10.200.118.197:7077
> java.io.IOException: Failed to send RPC 6813731522650020739 to 
> /10.200.118.197:7077: java.lang.AbstractMethodError: 
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
> at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:507)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:486)
> at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427)
> at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:129)
> at 
> io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:845)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:750)
> 

[jira] [Closed] (BEAM-18) Add support for new Beam Sink API

2017-02-24 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-18?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed BEAM-18.
-
   Resolution: Won't Fix
Fix Version/s: Not applicable

Currently, Beam Sinks are implemented via {{ParDo}}s and the Spark runner will 
trigger an action-less leaf so I don't see a reason to keep this ticket.

If anything changes in the Sink API this could be revisited.

> Add support for new Beam Sink API
> -
>
> Key: BEAM-18
> URL: https://issues.apache.org/jira/browse/BEAM-18
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
> Fix For: Not applicable
>
>
> Support Beam Sinks via Spark {{foreach}} API, Spark's native sinks (if / 
> where) possible.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-668) Reinstate runner direct translation for TextIO and AvroIO once Beam SDK supports hdfs.

2017-02-24 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed BEAM-668.
--
   Resolution: Won't Fix
Fix Version/s: Not applicable

It doesn't seem maintainable to directly translate IOs in general because Beam 
and Spark are not in synch.

> Reinstate runner direct translation for TextIO and AvroIO once Beam SDK 
> supports hdfs.
> --
>
> Key: BEAM-668
> URL: https://issues.apache.org/jira/browse/BEAM-668
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
> Fix For: Not applicable
>
>
> The runner has an implementation of a direct translation (not with 
> Read.Bounded) for TextIO and AvroIO.
> Those cannot be used properly until BEAM-59 is resolved and hdfs is properly 
> supported.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-848) A better shuffle after reading from within mapWithState.

2017-02-24 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated BEAM-848:
---
Issue Type: Improvement  (was: Bug)

> A better shuffle after reading from within mapWithState.
> 
>
> Key: BEAM-848
> URL: https://issues.apache.org/jira/browse/BEAM-848
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and 
> this stateful operation will be followed by a shuffle: 
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159
> Since the stateful read maps "splitSource" -> "partition of a list of read 
> values", the following shuffle won't benefit in any way (the list of read 
> values has not been flatMapped yet). In order to avoid shuffle we need to set 
> the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default 
> {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and 
> will skip shuffle if the partitioners match.
> It would be wise to shuffle the read values _after_ flatmap.
> I will break this into two tasks:
> # Set default-partitioner to the input RDD.
> # Shuffle (using Coders) the input.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1526) Flakes in Spark runner WatermarkTest.testInDoFn

2017-02-22 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1526.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> Flakes in Spark runner WatermarkTest.testInDoFn
> ---
>
> Key: BEAM-1526
> URL: https://issues.apache.org/jira/browse/BEAM-1526
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Kenneth Knowles
>Assignee: Thomas Groh
> Fix For: 0.6.0
>
>
> https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7612/org.apache.beam$beam-runners-spark/testReport/junit/org.apache.beam.runners.spark/WatermarkTest/testInDoFn/
> We should probably add {{@Ignore}} to the test for now, and remove it after 
> the next PR changes everything, and so on.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1512) Optimize leaf transforms materialization

2017-02-20 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1512.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> Optimize leaf transforms materialization
> 
>
> Key: BEAM-1512
> URL: https://issues.apache.org/jira/browse/BEAM-1512
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
> Fix For: 0.6.0
>
>
> Optimize leaf materialization in {{EvaluationContext}} Use register for 
> DStream leaves and an empty {{foreachPartition}} for other leaves instead of 
> the current {{count()}} which adds overhead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1492) Avoid potential issue in ASM 5.0

2017-02-17 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872065#comment-15872065
 ] 

Amit Sela commented on BEAM-1492:
-

[~kenn] Spark and Apex runners both use Kryo 2.x which transitively relies on 
ASM 4.0. Gearpump uses {{akka-kryo-serialization}} which depends on Kryo 3.x 
and so transitively relies on ASM 5.x.
This could be a problem for Spark runner V2 once this happens though since 
branch 2.1 of Spark depends on Kryo 3.0.3

I'll keep an eye!

> Avoid potential issue in ASM 5.0
> 
>
> Key: BEAM-1492
> URL: https://issues.apache.org/jira/browse/BEAM-1492
> Project: Beam
>  Issue Type: Task
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Amit Sela
>
> There is a suspected bug in asm 5.0 that is considered the likely root cause 
> of a [bug 
> sbt-assembly|https://github.com/sbt/sbt-assembly/issues/205#issuecomment-279964607]
>  that carried over to [GEARPUMP-236]. I have not found a direct reference to 
> what the issue is, precisely, but the dependency effect of this is extremely 
> small and these are libraries that are useful to keep current. And if/when 
> Gearpump runner lands on master this will avoid any diamond dep issues.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-774) Implement Metrics support for Spark runner

2017-02-15 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-774.

   Resolution: Fixed
Fix Version/s: 0.6.0

> Implement Metrics support for Spark runner
> --
>
> Key: BEAM-774
> URL: https://issues.apache.org/jira/browse/BEAM-774
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-spark
>Reporter: Ben Chambers
>Assignee: Aviem Zur
> Fix For: 0.6.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1470) A quite logger for testing

2017-02-13 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1470.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> A quite logger for testing
> --
>
> Key: BEAM-1470
> URL: https://issues.apache.org/jira/browse/BEAM-1470
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>Priority: Minor
> Fix For: 0.6.0
>
>
> Make test logger quite and clean to avoid flooding build logs and to allows 
> better visibility in case something breaks since current logging is very 
> crowded.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1470) A quite logger for testing

2017-02-13 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1470:
---

 Summary: A quite logger for testing
 Key: BEAM-1470
 URL: https://issues.apache.org/jira/browse/BEAM-1470
 Project: Beam
  Issue Type: Improvement
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela
Priority: Minor


Make test logger quite and clean to avoid flooding build logs and to allows 
better visibility in case something breaks since current logging is very 
crowded.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   >