[jira] [Commented] (BEAM-5519) Spark Streaming Duplicated Encoding/Decoding Effort
[ https://issues.apache.org/jira/browse/BEAM-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634849#comment-16634849 ] Amit Sela commented on BEAM-5519: - [~winkelman.kyle] you bring up a good point. IIRC we did all this mess to guarantee that in case a shuffle is required by Spark, we control it (initiate it), and it applies to RDDs containing serialized data only. This might have been _before_ we got the "force default partitioner" in place, or a mixed process.. not sure. You can do test your change (in streaming, which uses {{UpdateStateByKey}} and so creates shuffles etc.) to make sure that no shuffle occurs on RDDs containing deserialized data. In addition, you can use a non-Kryo serializable (if the runner still defaults to Kryo underneath..) and make sure it doesn't fail. Hope that helps! > Spark Streaming Duplicated Encoding/Decoding Effort > --- > > Key: BEAM-5519 > URL: https://issues.apache.org/jira/browse/BEAM-5519 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Kyle Winkelman >Assignee: Kyle Winkelman >Priority: Major > Labels: spark, spark-streaming > Time Spent: 50m > Remaining Estimate: 0h > > When using the SparkRunner in streaming mode. There is a call to groupByKey > followed by a call to updateStateByKey. BEAM-1815 fixed an issue where this > used to cause 2 shuffles but it still causes 2 encode/decode cycles. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module
[ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976280#comment-15976280 ] Amit Sela commented on BEAM-1789: - I'd start with something less complicated, such as: {{Repeatedly.forever(AfterPane.elementCountAtLeast(1)).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()}} This should fire for each update, with 1 day lateness and discarding panes. > window can't not use in spark cluster module > > > Key: BEAM-1789 > URL: https://issues.apache.org/jira/browse/BEAM-1789 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: tianyou >Assignee: Amit Sela > > I user beam in spark cluster,The application is blow. > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setRunner(SparkRunner.class); > options.setEnableSparkMetricSinks(false); > options.setStreaming(true); > options.setSparkMaster("spark://10.100.124.205:6066"); > options.setAppName("Beam App Spark"+new Random().nextFloat()); > options.setJobName("Beam Job Spark"+new Random().nextFloat()); > System.out.println("App Name:"+options.getAppName()); > System.out.println("Job Name:"+options.getJobName()); > options.setMaxRecordsPerBatch(10L); > > // PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > // Duration size = Duration.standardMinutes(4); > long duration = 60; > if(args!=null && args.length==1){ > duration = Integer.valueOf(args[0]); > } > Duration size = Duration.standardSeconds(duration); > System.out.println("时间窗口为:["+duration+"]秒"); > Window.Bound> fixWindow = > Window.> into( > FixedWindows.of(size) > ); > > String kafkaAddress = "10.100.124.208:9093"; > // String kafkaAddress = "192.168.100.212:9092"; > > Map kfConsunmerConf = new HashMap(); > kfConsunmerConf.put("auto.offset.reset", "latest"); > PCollection kafkaJsonPc = p.apply(KafkaIO. > read() > .withBootstrapServers(kafkaAddress) > .withTopics(ImmutableList.of("wypxx1")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .updateConsumerProperties(kfConsunmerConf) > .withoutMetadata() > ).apply(Values. create()); > > > PCollection> totalPc = kafkaJsonPc.apply( > "count line", > ParDo.of(new DoFn>() { > @ProcessElement > public void processElement(ProcessContext c) { > String line = c.element(); > Instant is = c.timestamp(); > if(line.length()>2) > line = line.substring(0,2); > System.out.println(line + " " + is.toString()); > c.output(KV.of(line, line)); > } > }) > ); > > > PCollection>> itPc = > totalPc.apply(fixWindow).apply( > "group by appKey", > GroupByKey.create() > ); > itPc.apply(ParDo.of(new DoFn>, Void>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV> keyIt = c.element(); > String key = keyIt.getKey(); > Iterable itb = keyIt.getValue(); > Iterator it = itb.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append(key).append(":["); > while(it.hasNext()){ > sb.append(it.next()).append(","); > } > String str = sb.toString(); > str = str.substring(0,str.length() -1) + "]"; > System.out.println(str); > String filePath = "/data/wyp/sparktest.txt"; > String line = "word-->["+key+"]total > count="+str+"--->time+"+c.timestamp().toString(); > System.out.println("writefile->"+line); > FileUtil.write(filePath, line, true, true); > } > > })); > > p.run().waitUntilFinish(); > When I user submit application to spark cluster.In spark UI,I can see log of > totalPc PCollection of. after one miniter but I can.t see log of itPc > PCollection. > I use local mode spark,It work well. > Please help me to resovle this prop
[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module
[ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974601#comment-15974601 ] Amit Sela commented on BEAM-1789: - Thanks for clearing this for me, now I can pinpoint the issue. Starting from {{0.6.0}} the Spark runner supports streaming (windows, triggers, etc.) but we're still experimenting with it. Having said that, I don't think what you're experiencing is necessarily an issue, but rather a behaviour that might be related to the watermark and the {{DefaultTrigger}} you're using (by default). In local mode, propagation of the watermark is probably faster and so the difference from cluster mode. You could try using a simple "X seconds delay" watermark when reading from Kafka. Let me know if it works for you. > window can't not use in spark cluster module > > > Key: BEAM-1789 > URL: https://issues.apache.org/jira/browse/BEAM-1789 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: tianyou >Assignee: Amit Sela > > I user beam in spark cluster,The application is blow. > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setRunner(SparkRunner.class); > options.setEnableSparkMetricSinks(false); > options.setStreaming(true); > options.setSparkMaster("spark://10.100.124.205:6066"); > options.setAppName("Beam App Spark"+new Random().nextFloat()); > options.setJobName("Beam Job Spark"+new Random().nextFloat()); > System.out.println("App Name:"+options.getAppName()); > System.out.println("Job Name:"+options.getJobName()); > options.setMaxRecordsPerBatch(10L); > > // PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > // Duration size = Duration.standardMinutes(4); > long duration = 60; > if(args!=null && args.length==1){ > duration = Integer.valueOf(args[0]); > } > Duration size = Duration.standardSeconds(duration); > System.out.println("时间窗口为:["+duration+"]秒"); > Window.Bound> fixWindow = > Window.> into( > FixedWindows.of(size) > ); > > String kafkaAddress = "10.100.124.208:9093"; > // String kafkaAddress = "192.168.100.212:9092"; > > Map kfConsunmerConf = new HashMap(); > kfConsunmerConf.put("auto.offset.reset", "latest"); > PCollection kafkaJsonPc = p.apply(KafkaIO. > read() > .withBootstrapServers(kafkaAddress) > .withTopics(ImmutableList.of("wypxx1")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .updateConsumerProperties(kfConsunmerConf) > .withoutMetadata() > ).apply(Values. create()); > > > PCollection> totalPc = kafkaJsonPc.apply( > "count line", > ParDo.of(new DoFn>() { > @ProcessElement > public void processElement(ProcessContext c) { > String line = c.element(); > Instant is = c.timestamp(); > if(line.length()>2) > line = line.substring(0,2); > System.out.println(line + " " + is.toString()); > c.output(KV.of(line, line)); > } > }) > ); > > > PCollection>> itPc = > totalPc.apply(fixWindow).apply( > "group by appKey", > GroupByKey.create() > ); > itPc.apply(ParDo.of(new DoFn>, Void>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV> keyIt = c.element(); > String key = keyIt.getKey(); > Iterable itb = keyIt.getValue(); > Iterator it = itb.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append(key).append(":["); > while(it.hasNext()){ > sb.append(it.next()).append(","); > } > String str = sb.toString(); > str = str.substring(0,str.length() -1) + "]"; > System.out.println(str); > String filePath = "/data/wyp/sparktest.txt"; > String line = "word-->["+key+"]total > count="+str+"--->time+"+c.timestamp().toString(); > System.out.println("writefile->"+line); > FileUtil.write(filePath, line, true, true); >
[jira] [Comment Edited] (BEAM-375) HadoopIO and runners-spark conflict with hadoop.version
[ https://issues.apache.org/jira/browse/BEAM-375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15967634#comment-15967634 ] Amit Sela edited comment on BEAM-375 at 4/13/17 2:16 PM: - What do you mean by "blocking" ? you mean the code execution is blocking ? was (Author: amitsela): What do you mean by "blocking" ? > HadoopIO and runners-spark conflict with hadoop.version > --- > > Key: BEAM-375 > URL: https://issues.apache.org/jira/browse/BEAM-375 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Reporter: Pei He >Assignee: Pei He > > HadoopIO currently uses 2.7.0 and runners-spark uses 2.2.0 for hadoop-client, > hadoop-common. > From [~amitsela] > "Spark can be built against different hadoop versions, but the release in > maven central is a 2.2.0 build (latest). '' > For HadoopIO, I don't know why 2.7.0 is picked at the beginning. I can check > if it will work with 2.2.0. > I am creating this issue, since I think it there is a general question. > In principle, HadoopIO and other sdks Sources should work with any runners. > But, when one set of runners require version A, but the other set of runners > require version B, we will need a general solution for it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-375) HadoopIO and runners-spark conflict with hadoop.version
[ https://issues.apache.org/jira/browse/BEAM-375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15967634#comment-15967634 ] Amit Sela commented on BEAM-375: What do you mean by "blocking" ? > HadoopIO and runners-spark conflict with hadoop.version > --- > > Key: BEAM-375 > URL: https://issues.apache.org/jira/browse/BEAM-375 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Reporter: Pei He >Assignee: Pei He > > HadoopIO currently uses 2.7.0 and runners-spark uses 2.2.0 for hadoop-client, > hadoop-common. > From [~amitsela] > "Spark can be built against different hadoop versions, but the release in > maven central is a 2.2.0 build (latest). '' > For HadoopIO, I don't know why 2.7.0 is picked at the beginning. I can check > if it will work with 2.2.0. > I am creating this issue, since I think it there is a general question. > In principle, HadoopIO and other sdks Sources should work with any runners. > But, when one set of runners require version A, but the other set of runners > require version B, we will need a general solution for it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module
[ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965633#comment-15965633 ] Amit Sela commented on BEAM-1789: - Sorry, I meant {{stderr}}, {{stdout}} never shows anything in Spark for some reason, and Systemout's are presented in {{stderr}} as well. > window can't not use in spark cluster module > > > Key: BEAM-1789 > URL: https://issues.apache.org/jira/browse/BEAM-1789 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: tianyou >Assignee: Amit Sela > > I user beam in spark cluster,The application is blow. > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setRunner(SparkRunner.class); > options.setEnableSparkMetricSinks(false); > options.setStreaming(true); > options.setSparkMaster("spark://10.100.124.205:6066"); > options.setAppName("Beam App Spark"+new Random().nextFloat()); > options.setJobName("Beam Job Spark"+new Random().nextFloat()); > System.out.println("App Name:"+options.getAppName()); > System.out.println("Job Name:"+options.getJobName()); > options.setMaxRecordsPerBatch(10L); > > // PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > // Duration size = Duration.standardMinutes(4); > long duration = 60; > if(args!=null && args.length==1){ > duration = Integer.valueOf(args[0]); > } > Duration size = Duration.standardSeconds(duration); > System.out.println("时间窗口为:["+duration+"]秒"); > Window.Bound> fixWindow = > Window.> into( > FixedWindows.of(size) > ); > > String kafkaAddress = "10.100.124.208:9093"; > // String kafkaAddress = "192.168.100.212:9092"; > > Map kfConsunmerConf = new HashMap(); > kfConsunmerConf.put("auto.offset.reset", "latest"); > PCollection kafkaJsonPc = p.apply(KafkaIO. > read() > .withBootstrapServers(kafkaAddress) > .withTopics(ImmutableList.of("wypxx1")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .updateConsumerProperties(kfConsunmerConf) > .withoutMetadata() > ).apply(Values. create()); > > > PCollection> totalPc = kafkaJsonPc.apply( > "count line", > ParDo.of(new DoFn>() { > @ProcessElement > public void processElement(ProcessContext c) { > String line = c.element(); > Instant is = c.timestamp(); > if(line.length()>2) > line = line.substring(0,2); > System.out.println(line + " " + is.toString()); > c.output(KV.of(line, line)); > } > }) > ); > > > PCollection>> itPc = > totalPc.apply(fixWindow).apply( > "group by appKey", > GroupByKey.create() > ); > itPc.apply(ParDo.of(new DoFn>, Void>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV> keyIt = c.element(); > String key = keyIt.getKey(); > Iterable itb = keyIt.getValue(); > Iterator it = itb.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append(key).append(":["); > while(it.hasNext()){ > sb.append(it.next()).append(","); > } > String str = sb.toString(); > str = str.substring(0,str.length() -1) + "]"; > System.out.println(str); > String filePath = "/data/wyp/sparktest.txt"; > String line = "word-->["+key+"]total > count="+str+"--->time+"+c.timestamp().toString(); > System.out.println("writefile->"+line); > FileUtil.write(filePath, line, true, true); > } > > })); > > p.run().waitUntilFinish(); > When I user submit application to spark cluster.In spark UI,I can see log of > totalPc PCollection of. after one miniter but I can.t see log of itPc > PCollection. > I use local mode spark,It work well. > Please help me to resovle this proplems.Thanks! -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module
[ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965498#comment-15965498 ] Amit Sela commented on BEAM-1789: - I didn't get a chance to run this on cluster, but if I understand correctly the issue your describing is that you expect you {{System.out.println}} to show in the logs and it doesn't, correct ? If so, in cluster (unlike local) you need to go through the Spark UI and look in the executors tab for {{stdout}}. > window can't not use in spark cluster module > > > Key: BEAM-1789 > URL: https://issues.apache.org/jira/browse/BEAM-1789 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: tianyou >Assignee: Amit Sela > > I user beam in spark cluster,The application is blow. > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setRunner(SparkRunner.class); > options.setEnableSparkMetricSinks(false); > options.setStreaming(true); > options.setSparkMaster("spark://10.100.124.205:6066"); > options.setAppName("Beam App Spark"+new Random().nextFloat()); > options.setJobName("Beam Job Spark"+new Random().nextFloat()); > System.out.println("App Name:"+options.getAppName()); > System.out.println("Job Name:"+options.getJobName()); > options.setMaxRecordsPerBatch(10L); > > // PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > // Duration size = Duration.standardMinutes(4); > long duration = 60; > if(args!=null && args.length==1){ > duration = Integer.valueOf(args[0]); > } > Duration size = Duration.standardSeconds(duration); > System.out.println("时间窗口为:["+duration+"]秒"); > Window.Bound> fixWindow = > Window.> into( > FixedWindows.of(size) > ); > > String kafkaAddress = "10.100.124.208:9093"; > // String kafkaAddress = "192.168.100.212:9092"; > > Map kfConsunmerConf = new HashMap(); > kfConsunmerConf.put("auto.offset.reset", "latest"); > PCollection kafkaJsonPc = p.apply(KafkaIO. > read() > .withBootstrapServers(kafkaAddress) > .withTopics(ImmutableList.of("wypxx1")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .updateConsumerProperties(kfConsunmerConf) > .withoutMetadata() > ).apply(Values. create()); > > > PCollection> totalPc = kafkaJsonPc.apply( > "count line", > ParDo.of(new DoFn>() { > @ProcessElement > public void processElement(ProcessContext c) { > String line = c.element(); > Instant is = c.timestamp(); > if(line.length()>2) > line = line.substring(0,2); > System.out.println(line + " " + is.toString()); > c.output(KV.of(line, line)); > } > }) > ); > > > PCollection>> itPc = > totalPc.apply(fixWindow).apply( > "group by appKey", > GroupByKey.create() > ); > itPc.apply(ParDo.of(new DoFn>, Void>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV> keyIt = c.element(); > String key = keyIt.getKey(); > Iterable itb = keyIt.getValue(); > Iterator it = itb.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append(key).append(":["); > while(it.hasNext()){ > sb.append(it.next()).append(","); > } > String str = sb.toString(); > str = str.substring(0,str.length() -1) + "]"; > System.out.println(str); > String filePath = "/data/wyp/sparktest.txt"; > String line = "word-->["+key+"]total > count="+str+"--->time+"+c.timestamp().toString(); > System.out.println("writefile->"+line); > FileUtil.write(filePath, line, true, true); > } > > })); > > p.run().waitUntilFinish(); > When I user submit application to spark cluster.In spark UI,I can see log of > totalPc PCollection of. after one miniter but I can.t see log of itPc > PCollection. > I use local mode
[jira] [Commented] (BEAM-1920) Add Spark 2.x support in Spark runner
[ https://issues.apache.org/jira/browse/BEAM-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962076#comment-15962076 ] Amit Sela commented on BEAM-1920: - Sweet! I'm considering Spark 2.x as default and {{1.6.3}} as the profile-enabled version because there are many issues with flaky tests now that I believe that should be resolved as a part of a bunch of bug fixes introduced in Spark {{2.0.0}} and {{2.0.1}}. The only Con I can think of for that is that Spark 2.x dependency would provide an easy path to new, incompatible, APIs in Spark so we'd have to think carefully how we test for {{1.6.3}} breaking changes. > Add Spark 2.x support in Spark runner > - > > Key: BEAM-1920 > URL: https://issues.apache.org/jira/browse/BEAM-1920 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Jean-Baptiste Onofré >Assignee: Jean-Baptiste Onofré > > I have a branch working with both Spark 1 and Spark 2 backend. > I'm preparing a pull request about that. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1737) Implement a Single-output ParDo as a Multi-output ParDo with a single output
[ https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1737. - Resolution: Fixed Fix Version/s: First stable release > Implement a Single-output ParDo as a Multi-output ParDo with a single output > > > Key: BEAM-1737 > URL: https://issues.apache.org/jira/browse/BEAM-1737 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Thomas Groh >Assignee: Amit Sela >Priority: Minor > Fix For: First stable release > > > This is the cause of having a separate path and implementation for > single-output ParDos, even though both go through the same translator. > Partial stacktrace: > Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< > FAILURE! - in > org.apache.beam.runners.spark.translation.streaming.CreateStreamTest >[8233/41535] > testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest) > Time elapsed: 3.593 sec <<< ERROR! > java.lang.RuntimeException: > java.io.NotSerializableException: DStream checkpointing has been enabled but > the DStreams with their functions are not serializable > org.apache.beam.runners.spark.translation.EvaluationContext > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.translation.EvaluationContext, value: > org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7) > - field (class: > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > name: val$context, type: class > org.apache.beam.runners.spark.translation.EvaluationContext) > - object (class > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940) > - field (class: > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > name: transformFunc$3, type: interface > org.apache.spark.api.java.function.Function) > - object (class > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > name: cleanedF$2, type: interface scala.Function1) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > name: cleanedF$3, type: interface scala.Function2) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > ) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class > org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > > at > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) > at > org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210) > ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-981) Not possible to directly submit a pipeline on spark cluster
[ https://issues.apache.org/jira/browse/BEAM-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957480#comment-15957480 ] Amit Sela commented on BEAM-981: [~iemejia] wold you mind taking a look at this one ? we should be able to submit programatically to a Spark cluster. > Not possible to directly submit a pipeline on spark cluster > --- > > Key: BEAM-981 > URL: https://issues.apache.org/jira/browse/BEAM-981 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 0.3.0-incubating, 0.4.0 >Reporter: Jean-Baptiste Onofré > > It's not possible to directly run a pipeline on the spark runner (for > instance using {{mvn exec:java}}. It fails with: > {code} > [appclient-register-master-threadpool-0] INFO > org.apache.spark.deploy.client.AppClient$ClientEndpoint - Connecting to > master spark://10.200.118.197:7077... > [shuffle-client-0] ERROR org.apache.spark.network.client.TransportClient - > Failed to send RPC 6813731522650020739 to /10.200.118.197:7077: > java.lang.AbstractMethodError: > org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted; > java.lang.AbstractMethodError: > org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted; > at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73) > at > io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:820) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733) > at > io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:826) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733) > at > io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740) > at > io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1101) > at > io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1090) > at > io.netty.util.concurrent.SingleThreadEventExecutor.safeExecute(SingleThreadEventExecutor.java:451) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) > at java.lang.Thread.run(Thread.java:745) > [appclient-register-master-threadpool-0] WARN > org.apache.spark.deploy.client.AppClient$ClientEndpoint - Failed to connect > to master 10.200.118.197:7077 > java.io.IOException: Failed to send RPC 6813731522650020739 to > /10.200.118.197:7077: java.lang.AbstractMethodError: > org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted; > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239) > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226) > at > io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:507) > at > io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:486) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427) > at > io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:129) > at > io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:845) > at > io.netty.channel.Ab
[jira] [Commented] (BEAM-1737) Implement a Single-output ParDo as a Multi-output ParDo with a single output
[ https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956637#comment-15956637 ] Amit Sela commented on BEAM-1737: - Changed the title since the change will actually do just that. > Implement a Single-output ParDo as a Multi-output ParDo with a single output > > > Key: BEAM-1737 > URL: https://issues.apache.org/jira/browse/BEAM-1737 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Thomas Groh >Assignee: Amit Sela >Priority: Minor > > This is the cause of having a separate path and implementation for > single-output ParDos, even though both go through the same translator. > Partial stacktrace: > Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< > FAILURE! - in > org.apache.beam.runners.spark.translation.streaming.CreateStreamTest >[8233/41535] > testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest) > Time elapsed: 3.593 sec <<< ERROR! > java.lang.RuntimeException: > java.io.NotSerializableException: DStream checkpointing has been enabled but > the DStreams with their functions are not serializable > org.apache.beam.runners.spark.translation.EvaluationContext > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.translation.EvaluationContext, value: > org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7) > - field (class: > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > name: val$context, type: class > org.apache.beam.runners.spark.translation.EvaluationContext) > - object (class > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940) > - field (class: > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > name: transformFunc$3, type: interface > org.apache.spark.api.java.function.Function) > - object (class > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > name: cleanedF$2, type: interface scala.Function1) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > name: cleanedF$3, type: interface scala.Function2) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > ) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class > org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > > at > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) > at > org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210) > ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1737) Implement a Single-output ParDo as a Multi-output ParDo with a single output
[ https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated BEAM-1737: Summary: Implement a Single-output ParDo as a Multi-output ParDo with a single output (was: Interpreting a Single-output ParDo as a Multi-output ParDo with a single output causes serialization failures) > Implement a Single-output ParDo as a Multi-output ParDo with a single output > > > Key: BEAM-1737 > URL: https://issues.apache.org/jira/browse/BEAM-1737 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Thomas Groh >Assignee: Amit Sela >Priority: Minor > > This is the cause of having a separate path and implementation for > single-output ParDos, even though both go through the same translator. > Partial stacktrace: > Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< > FAILURE! - in > org.apache.beam.runners.spark.translation.streaming.CreateStreamTest >[8233/41535] > testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest) > Time elapsed: 3.593 sec <<< ERROR! > java.lang.RuntimeException: > java.io.NotSerializableException: DStream checkpointing has been enabled but > the DStreams with their functions are not serializable > org.apache.beam.runners.spark.translation.EvaluationContext > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.translation.EvaluationContext, value: > org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7) > - field (class: > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > name: val$context, type: class > org.apache.beam.runners.spark.translation.EvaluationContext) > - object (class > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940) > - field (class: > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > name: transformFunc$3, type: interface > org.apache.spark.api.java.function.Function) > - object (class > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > name: cleanedF$2, type: interface scala.Function1) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > name: cleanedF$3, type: interface scala.Function2) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > ) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class > org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > > at > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) > at > org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210) > ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1737) Interpreting a Single-output ParDo as a Multi-output ParDo with a single output causes serialization failures
[ https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela reassigned BEAM-1737: --- Assignee: Amit Sela > Interpreting a Single-output ParDo as a Multi-output ParDo with a single > output causes serialization failures > - > > Key: BEAM-1737 > URL: https://issues.apache.org/jira/browse/BEAM-1737 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Thomas Groh >Assignee: Amit Sela >Priority: Minor > > This is the cause of having a separate path and implementation for > single-output ParDos, even though both go through the same translator. > Partial stacktrace: > Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< > FAILURE! - in > org.apache.beam.runners.spark.translation.streaming.CreateStreamTest >[8233/41535] > testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest) > Time elapsed: 3.593 sec <<< ERROR! > java.lang.RuntimeException: > java.io.NotSerializableException: DStream checkpointing has been enabled but > the DStreams with their functions are not serializable > org.apache.beam.runners.spark.translation.EvaluationContext > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.translation.EvaluationContext, value: > org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7) > - field (class: > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > name: val$context, type: class > org.apache.beam.runners.spark.translation.EvaluationContext) > - object (class > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940) > - field (class: > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > name: transformFunc$3, type: interface > org.apache.spark.api.java.function.Function) > - object (class > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > name: cleanedF$2, type: interface scala.Function1) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > name: cleanedF$3, type: interface scala.Function2) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > ) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class > org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > > at > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) > at > org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210) > ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1875) Remove Spark runner custom Hadoop and Avro IOs.
[ https://issues.apache.org/jira/browse/BEAM-1875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1875. - Resolution: Fixed Fix Version/s: First stable release > Remove Spark runner custom Hadoop and Avro IOs. > --- > > Key: BEAM-1875 > URL: https://issues.apache.org/jira/browse/BEAM-1875 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > Fix For: First stable release > > > Remove custom IOs, their evaluators, tests and usage. > The runner relies on the SDK, and its tests, for all IOs. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1737) Interpreting a Single-output ParDo as a Multi-output ParDo with a single output causes serialization failures
[ https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954833#comment-15954833 ] Amit Sela commented on BEAM-1737: - This is just because you are not allowed to use not Serializables such as {{EvaluationContext}} inside {{DStream}} lambdas. This line is simply not necessary and can be removed: https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java#L436 The step name is set before calling any {{DStream}} either for single/multi output. I'll have a fix ready. > Interpreting a Single-output ParDo as a Multi-output ParDo with a single > output causes serialization failures > - > > Key: BEAM-1737 > URL: https://issues.apache.org/jira/browse/BEAM-1737 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Thomas Groh >Priority: Minor > > This is the cause of having a separate path and implementation for > single-output ParDos, even though both go through the same translator. > Partial stacktrace: > Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< > FAILURE! - in > org.apache.beam.runners.spark.translation.streaming.CreateStreamTest >[8233/41535] > testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest) > Time elapsed: 3.593 sec <<< ERROR! > java.lang.RuntimeException: > java.io.NotSerializableException: DStream checkpointing has been enabled but > the DStreams with their functions are not serializable > org.apache.beam.runners.spark.translation.EvaluationContext > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.translation.EvaluationContext, value: > org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7) > - field (class: > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > name: val$context, type: class > org.apache.beam.runners.spark.translation.EvaluationContext) > - object (class > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940) > - field (class: > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > name: transformFunc$3, type: interface > org.apache.spark.api.java.function.Function) > - object (class > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > name: cleanedF$2, type: interface scala.Function1) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > ) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > name: cleanedF$3, type: interface scala.Function2) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > ) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class > org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > > at > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) > at > org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210) > ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1875) Remove Spark runner custom Hadoop and Avro IOs.
Amit Sela created BEAM-1875: --- Summary: Remove Spark runner custom Hadoop and Avro IOs. Key: BEAM-1875 URL: https://issues.apache.org/jira/browse/BEAM-1875 Project: Beam Issue Type: Improvement Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela Priority: Minor Remove custom IOs, their evaluators, tests and usage. The runner relies on the SDK, and its tests, for all IOs. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-848) Shuffle input read-values to get maximum parallelism.
[ https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953760#comment-15953760 ] Amit Sela commented on BEAM-848: Agree. closed as invalid. If use cases prove the need, we can consider a "repartition" to max parallelism (or a pre-set one) post read. > Shuffle input read-values to get maximum parallelism. > - > > Key: BEAM-848 > URL: https://issues.apache.org/jira/browse/BEAM-848 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Aviem Zur > Fix For: First stable release > > > It would be wise to shuffle the read values _after_ flatmap to increase > parallelism in processing of the data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (BEAM-848) Shuffle input read-values to get maximum parallelism.
[ https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela closed BEAM-848. -- Resolution: Invalid Fix Version/s: First stable release > Shuffle input read-values to get maximum parallelism. > - > > Key: BEAM-848 > URL: https://issues.apache.org/jira/browse/BEAM-848 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Aviem Zur > Fix For: First stable release > > > It would be wise to shuffle the read values _after_ flatmap to increase > parallelism in processing of the data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (BEAM-696) Document: Side-Inputs non-deterministic with merging main-input windows
[ https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela closed BEAM-696. -- Resolution: Not A Problem Fix Version/s: Not applicable Resolved by providing better documentation. See comments for details. > Document: Side-Inputs non-deterministic with merging main-input windows > --- > > Key: BEAM-696 > URL: https://issues.apache.org/jira/browse/BEAM-696 > Project: Beam > Issue Type: Task > Components: beam-model >Reporter: Ben Chambers >Assignee: Amit Sela > Fix For: Not applicable > > > Side-Inputs are non-deterministic for several reasons: > 1. Because they depend on triggering of the side-input (this is acceptable > because triggers are by their nature non-deterministic). > 2. They depend on the current state of the main-input window in order to > lookup the side-input. This means that with merging > 3. Any runner optimizations that affect when the side-input is looked up may > cause problems with either or both of these. > This issue focuses on #2 -- the non-determinism of side-inputs that execute > within a Merging WindowFn. > Possible solution would be to defer running anything that looks up the > side-input until we need to extract an output, and using the main-window at > that point. Specifically, if the main-window is a MergingWindowFn, don't > execute any kind of pre-combine, instead buffer all the inputs and combine > later. > This could still run into some non-determinism if there are triggers > controlling when we extract output. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1827) Fix use of deprecated Spark APIs in the runner.
[ https://issues.apache.org/jira/browse/BEAM-1827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1827. - Resolution: Fixed Fix Version/s: First stable release > Fix use of deprecated Spark APIs in the runner. > > > Key: BEAM-1827 > URL: https://issues.apache.org/jira/browse/BEAM-1827 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > Fix For: First stable release > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1827) Fix use of deprecated Spark APIs in the runner.
Amit Sela created BEAM-1827: --- Summary: Fix use of deprecated Spark APIs in the runner. Key: BEAM-1827 URL: https://issues.apache.org/jira/browse/BEAM-1827 Project: Beam Issue Type: Improvement Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1815) Avoid shuffling twice in GABW
[ https://issues.apache.org/jira/browse/BEAM-1815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1815. - Resolution: Fixed Fix Version/s: First stable release > Avoid shuffling twice in GABW > - > > Key: BEAM-1815 > URL: https://issues.apache.org/jira/browse/BEAM-1815 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > Spark runner implementation of GABW includes a "built-in" groupByKey, but > BOBK before it already groups, so in order to avoid an unnecessary shuffle we > need to force a {{Partitioner}} on the RDDs involved. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once
[ https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943698#comment-15943698 ] Amit Sela edited comment on BEAM-1717 at 3/27/17 5:46 PM: -- [~davor] {{beam-sdks-java-core}} is OK now, but now the release fails on a duplicate tests jar in the Dataflow runner module, I'll dig back in later this week but maybe you could also take a look there and see if something catches your attention. We're working our way through this one nicely module-by-module, just a little bit more effort and we'll get there ;) was (Author: amitsela): [~davor] {{beam-sdks-java-core}} is OK now, but now the release fails on duplicate in the Dataflow runner module, I'll dig back in later this week but maybe you could also take a look there and see if something catches your attention. We're working our way through this one nicely module-by-module, just a little bit more effort and we'll get there ;) > Maven release/deploy tries to uploads some artifacts more than once > > > Key: BEAM-1717 > URL: https://issues.apache.org/jira/browse/BEAM-1717 > Project: Beam > Issue Type: Bug > Components: build-system >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > Fix For: First stable release > > > Running maven {{release}} or {{deploy}} causes some artifacts to deploy more > than once which fails deployments to release Nexus. > While this is not an issue for the Apache release process (because it uses a > staging Nexus), this affects users who wish to deploy their own fork. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once
[ https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943698#comment-15943698 ] Amit Sela commented on BEAM-1717: - [~davor] {{beam-sdks-java-core}} is OK now, but now the release fails on duplicate in the Dataflow runner module, I'll dig back in later this week but maybe you could also take a look there and see if something catches your attention. We're working our way through this one nicely module-by-module, just a little bit more effort and we'll get there ;) > Maven release/deploy tries to uploads some artifacts more than once > > > Key: BEAM-1717 > URL: https://issues.apache.org/jira/browse/BEAM-1717 > Project: Beam > Issue Type: Bug > Components: build-system >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > Fix For: First stable release > > > Running maven {{release}} or {{deploy}} causes some artifacts to deploy more > than once which fails deployments to release Nexus. > While this is not an issue for the Apache release process (because it uses a > staging Nexus), this affects users who wish to deploy their own fork. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1815) Avoid shuffling twice in GABW
Amit Sela created BEAM-1815: --- Summary: Avoid shuffling twice in GABW Key: BEAM-1815 URL: https://issues.apache.org/jira/browse/BEAM-1815 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela Spark runner implementation of GABW includes a "built-in" groupByKey, but BOBK before it already groups, so in order to avoid an unnecessary shuffle we need to force a {{Partitioner}} on the RDDs involved. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1802) Spark Runner does not shutdown correctly when executing multiple pipelines in sequence
[ https://issues.apache.org/jira/browse/BEAM-1802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1802. - Resolution: Fixed Fix Version/s: First stable release > Spark Runner does not shutdown correctly when executing multiple pipelines in > sequence > -- > > Key: BEAM-1802 > URL: https://issues.apache.org/jira/browse/BEAM-1802 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Ismaël Mejía >Assignee: Aviem Zur > Fix For: First stable release > > > I found this while running the Nexmark queries in sequence in local mode. I > had the correct configuration but it didn't seem to work. > 17/03/24 12:07:49 WARN org.apache.spark.SparkContext: Multiple running > SparkContexts detected in the same JVM! > org.apache.spark.SparkException: Only one SparkContext may be running in this > JVM (see SPARK-2243). To ignore this error, set > spark.driver.allowMultipleContexts = true. The currently running SparkContext > was created at: > org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59) > org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:100) > org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:69) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:206) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:91) > org.apache.beam.sdk.Pipeline.run(Pipeline.java:266) > org.apache.beam.integration.nexmark.NexmarkRunner.run(NexmarkRunner.java:1233) > org.apache.beam.integration.nexmark.NexmarkDriver.runAll(NexmarkDriver.java:69) > org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver.main(NexmarkSparkDriver.java:46) > at > org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257) > at > org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1802) Spark Runner does not shutdown correctly when executing multiple pipelines in sequence
[ https://issues.apache.org/jira/browse/BEAM-1802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940422#comment-15940422 ] Amit Sela commented on BEAM-1802: - {{PipelineResult#cancel()}} should do the trick for now > Spark Runner does not shutdown correctly when executing multiple pipelines in > sequence > -- > > Key: BEAM-1802 > URL: https://issues.apache.org/jira/browse/BEAM-1802 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Ismaël Mejía >Assignee: Aviem Zur > > I found this while running the Nexmark queries in sequence in local mode. I > had the correct configuration but it didn't seem to work. > 17/03/24 12:07:49 WARN org.apache.spark.SparkContext: Multiple running > SparkContexts detected in the same JVM! > org.apache.spark.SparkException: Only one SparkContext may be running in this > JVM (see SPARK-2243). To ignore this error, set > spark.driver.allowMultipleContexts = true. The currently running SparkContext > was created at: > org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59) > org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:100) > org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:69) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:206) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:91) > org.apache.beam.sdk.Pipeline.run(Pipeline.java:266) > org.apache.beam.integration.nexmark.NexmarkRunner.run(NexmarkRunner.java:1233) > org.apache.beam.integration.nexmark.NexmarkDriver.runAll(NexmarkDriver.java:69) > org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver.main(NexmarkSparkDriver.java:46) > at > org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257) > at > org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1802) Spark Runner does not shutdown correctly when executing multiple pipelines in sequence
[ https://issues.apache.org/jira/browse/BEAM-1802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940351#comment-15940351 ] Amit Sela commented on BEAM-1802: - The only valid scenario to expect the context to terminate is in batch jobs when blocking on {{waitUntilFinish()}} without timeout. > Spark Runner does not shutdown correctly when executing multiple pipelines in > sequence > -- > > Key: BEAM-1802 > URL: https://issues.apache.org/jira/browse/BEAM-1802 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Ismaël Mejía >Assignee: Aviem Zur > > I found this while running the Nexmark queries in sequence in local mode. I > had the correct configuration but it didn't seem to work. > 17/03/24 12:07:49 WARN org.apache.spark.SparkContext: Multiple running > SparkContexts detected in the same JVM! > org.apache.spark.SparkException: Only one SparkContext may be running in this > JVM (see SPARK-2243). To ignore this error, set > spark.driver.allowMultipleContexts = true. The currently running SparkContext > was created at: > org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59) > org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:100) > org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:69) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:206) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:91) > org.apache.beam.sdk.Pipeline.run(Pipeline.java:266) > org.apache.beam.integration.nexmark.NexmarkRunner.run(NexmarkRunner.java:1233) > org.apache.beam.integration.nexmark.NexmarkDriver.runAll(NexmarkDriver.java:69) > org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver.main(NexmarkSparkDriver.java:46) > at > org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257) > at > org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1802) Spark Runner does not shutdown correctly when executing multiple pipelines in sequence
[ https://issues.apache.org/jira/browse/BEAM-1802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940266#comment-15940266 ] Amit Sela commented on BEAM-1802: - [~iemejia] do you stop the pipeline between executions ? {code} Pipeline p = ... p.apply().apply().. p.waitUntilFinish() // do you call stop() ? {code} > Spark Runner does not shutdown correctly when executing multiple pipelines in > sequence > -- > > Key: BEAM-1802 > URL: https://issues.apache.org/jira/browse/BEAM-1802 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Ismaël Mejía >Assignee: Aviem Zur > > I found this while running the Nexmark queries in sequence in local mode. I > had the correct configuration but it didn't seem to work. > 17/03/24 12:07:49 WARN org.apache.spark.SparkContext: Multiple running > SparkContexts detected in the same JVM! > org.apache.spark.SparkException: Only one SparkContext may be running in this > JVM (see SPARK-2243). To ignore this error, set > spark.driver.allowMultipleContexts = true. The currently running SparkContext > was created at: > org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59) > org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:100) > org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:69) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:206) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:91) > org.apache.beam.sdk.Pipeline.run(Pipeline.java:266) > org.apache.beam.integration.nexmark.NexmarkRunner.run(NexmarkRunner.java:1233) > org.apache.beam.integration.nexmark.NexmarkDriver.runAll(NexmarkDriver.java:69) > org.apache.beam.integration.nexmark.drivers.NexmarkSparkDriver.main(NexmarkSparkDriver.java:46) > at > org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257) > at > org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once
[ https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938372#comment-15938372 ] Amit Sela commented on BEAM-1717: - Looks like https://github.com/apache/beam/pull/2261 and the patch I described above fix the issues in {{beam-sdks-java-core}}. Now there's a similar issue in dataflow-runner, I'll keep digging. > Maven release/deploy tries to uploads some artifacts more than once > > > Key: BEAM-1717 > URL: https://issues.apache.org/jira/browse/BEAM-1717 > Project: Beam > Issue Type: Bug > Components: build-system >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > > Running maven {{release}} or {{deploy}} causes some artifacts to deploy more > than once which fails deployments to release Nexus. > While this is not an issue for the Apache release process (because it uses a > staging Nexus), this affects users who wish to deploy their own fork. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module
[ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938115#comment-15938115 ] Amit Sela commented on BEAM-1789: - Please keep in mind that Spark runner support for streaming is still experimental. Once we finish-up the work on streaming and have an IT running WindowedWordCount in streaming we'll have better visibility. > window can't not use in spark cluster module > > > Key: BEAM-1789 > URL: https://issues.apache.org/jira/browse/BEAM-1789 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: tianyou >Assignee: Amit Sela > > I user beam in spark cluster,The application is blow. > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setRunner(SparkRunner.class); > options.setEnableSparkMetricSinks(false); > options.setStreaming(true); > options.setSparkMaster("spark://10.100.124.205:6066"); > options.setAppName("Beam App Spark"+new Random().nextFloat()); > options.setJobName("Beam Job Spark"+new Random().nextFloat()); > System.out.println("App Name:"+options.getAppName()); > System.out.println("Job Name:"+options.getJobName()); > options.setMaxRecordsPerBatch(10L); > > // PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > // Duration size = Duration.standardMinutes(4); > long duration = 60; > if(args!=null && args.length==1){ > duration = Integer.valueOf(args[0]); > } > Duration size = Duration.standardSeconds(duration); > System.out.println("时间窗口为:["+duration+"]秒"); > Window.Bound> fixWindow = > Window.> into( > FixedWindows.of(size) > ); > > String kafkaAddress = "10.100.124.208:9093"; > // String kafkaAddress = "192.168.100.212:9092"; > > Map kfConsunmerConf = new HashMap(); > kfConsunmerConf.put("auto.offset.reset", "latest"); > PCollection kafkaJsonPc = p.apply(KafkaIO. > read() > .withBootstrapServers(kafkaAddress) > .withTopics(ImmutableList.of("wypxx1")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .updateConsumerProperties(kfConsunmerConf) > .withoutMetadata() > ).apply(Values. create()); > > > PCollection> totalPc = kafkaJsonPc.apply( > "count line", > ParDo.of(new DoFn>() { > @ProcessElement > public void processElement(ProcessContext c) { > String line = c.element(); > Instant is = c.timestamp(); > if(line.length()>2) > line = line.substring(0,2); > System.out.println(line + " " + is.toString()); > c.output(KV.of(line, line)); > } > }) > ); > > > PCollection>> itPc = > totalPc.apply(fixWindow).apply( > "group by appKey", > GroupByKey.create() > ); > itPc.apply(ParDo.of(new DoFn>, Void>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV> keyIt = c.element(); > String key = keyIt.getKey(); > Iterable itb = keyIt.getValue(); > Iterator it = itb.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append(key).append(":["); > while(it.hasNext()){ > sb.append(it.next()).append(","); > } > String str = sb.toString(); > str = str.substring(0,str.length() -1) + "]"; > System.out.println(str); > String filePath = "/data/wyp/sparktest.txt"; > String line = "word-->["+key+"]total > count="+str+"--->time+"+c.timestamp().toString(); > System.out.println("writefile->"+line); > FileUtil.write(filePath, line, true, true); > } > > })); > > p.run().waitUntilFinish(); > When I user submit application to spark cluster.In spark UI,I can see log of > totalPc PCollection of. after one miniter but I can.t see log of itPc > PCollection. > I use local mode spark,It work well. > Please help me to resovle this proplems.Thanks! -- This message was sent by At
[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module
[ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938032#comment-15938032 ] Amit Sela commented on BEAM-1789: - In local mode all the logging is in one single log, in cluster mode each executor logs it's logging locally - that's what I'm trying to figure out, are you looking at all executors log for the missing log-line ? Also, I think you might want to change the name of this issue since it doesn't seem to be related to windows (or am I missing something) ? > window can't not use in spark cluster module > > > Key: BEAM-1789 > URL: https://issues.apache.org/jira/browse/BEAM-1789 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: tianyou >Assignee: Amit Sela > > I user beam in spark cluster,The application is blow. > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setRunner(SparkRunner.class); > options.setEnableSparkMetricSinks(false); > options.setStreaming(true); > options.setSparkMaster("spark://10.100.124.205:6066"); > options.setAppName("Beam App Spark"+new Random().nextFloat()); > options.setJobName("Beam Job Spark"+new Random().nextFloat()); > System.out.println("App Name:"+options.getAppName()); > System.out.println("Job Name:"+options.getJobName()); > options.setMaxRecordsPerBatch(10L); > > // PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > // Duration size = Duration.standardMinutes(4); > long duration = 60; > if(args!=null && args.length==1){ > duration = Integer.valueOf(args[0]); > } > Duration size = Duration.standardSeconds(duration); > System.out.println("时间窗口为:["+duration+"]秒"); > Window.Bound> fixWindow = > Window.> into( > FixedWindows.of(size) > ); > > String kafkaAddress = "10.100.124.208:9093"; > // String kafkaAddress = "192.168.100.212:9092"; > > Map kfConsunmerConf = new HashMap(); > kfConsunmerConf.put("auto.offset.reset", "latest"); > PCollection kafkaJsonPc = p.apply(KafkaIO. > read() > .withBootstrapServers(kafkaAddress) > .withTopics(ImmutableList.of("wypxx1")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .updateConsumerProperties(kfConsunmerConf) > .withoutMetadata() > ).apply(Values. create()); > > > PCollection> totalPc = kafkaJsonPc.apply( > "count line", > ParDo.of(new DoFn>() { > @ProcessElement > public void processElement(ProcessContext c) { > String line = c.element(); > Instant is = c.timestamp(); > if(line.length()>2) > line = line.substring(0,2); > System.out.println(line + " " + is.toString()); > c.output(KV.of(line, line)); > } > }) > ); > > > PCollection>> itPc = > totalPc.apply(fixWindow).apply( > "group by appKey", > GroupByKey.create() > ); > itPc.apply(ParDo.of(new DoFn>, Void>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV> keyIt = c.element(); > String key = keyIt.getKey(); > Iterable itb = keyIt.getValue(); > Iterator it = itb.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append(key).append(":["); > while(it.hasNext()){ > sb.append(it.next()).append(","); > } > String str = sb.toString(); > str = str.substring(0,str.length() -1) + "]"; > System.out.println(str); > String filePath = "/data/wyp/sparktest.txt"; > String line = "word-->["+key+"]total > count="+str+"--->time+"+c.timestamp().toString(); > System.out.println("writefile->"+line); > FileUtil.write(filePath, line, true, true); > } > > })); > > p.run().waitUntilFinish(); > When I user submit application to spark cluster.In spark UI,I can see log of > totalPc PCollection of. after one miniter but I can.t see log
[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module
[ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15937871#comment-15937871 ] Amit Sela commented on BEAM-1789: - [~tianyou] where are you looking at the logs ? in Spark UI ? which executors ? not all executors will run tasks for all steps of the pipeline so an executor that runs tasks that process totalPc but no task for itPc won't show the log. A couple of questions/notes: Do you see any exceptions/errors ? is the application behaving as expected (besides the logging) ? Which Beam version are you running ? Is there a particular reason you write to file from within the DoFn and not using Beam's TextIO/HdfsIO ? > window can't not use in spark cluster module > > > Key: BEAM-1789 > URL: https://issues.apache.org/jira/browse/BEAM-1789 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: tianyou >Assignee: Amit Sela > > I user beam in spark cluster,The application is blow. > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setRunner(SparkRunner.class); > options.setEnableSparkMetricSinks(false); > options.setStreaming(true); > options.setSparkMaster("spark://10.100.124.205:6066"); > options.setAppName("Beam App Spark"+new Random().nextFloat()); > options.setJobName("Beam Job Spark"+new Random().nextFloat()); > System.out.println("App Name:"+options.getAppName()); > System.out.println("Job Name:"+options.getJobName()); > options.setMaxRecordsPerBatch(10L); > > // PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > // Duration size = Duration.standardMinutes(4); > long duration = 60; > if(args!=null && args.length==1){ > duration = Integer.valueOf(args[0]); > } > Duration size = Duration.standardSeconds(duration); > System.out.println("时间窗口为:["+duration+"]秒"); > Window.Bound> fixWindow = > Window.> into( > FixedWindows.of(size) > ); > > String kafkaAddress = "10.100.124.208:9093"; > // String kafkaAddress = "192.168.100.212:9092"; > > Map kfConsunmerConf = new HashMap(); > kfConsunmerConf.put("auto.offset.reset", "latest"); > PCollection kafkaJsonPc = p.apply(KafkaIO. > read() > .withBootstrapServers(kafkaAddress) > .withTopics(ImmutableList.of("wypxx1")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .updateConsumerProperties(kfConsunmerConf) > .withoutMetadata() > ).apply(Values. create()); > > > PCollection> totalPc = kafkaJsonPc.apply( > "count line", > ParDo.of(new DoFn>() { > @ProcessElement > public void processElement(ProcessContext c) { > String line = c.element(); > Instant is = c.timestamp(); > if(line.length()>2) > line = line.substring(0,2); > System.out.println(line + " " + is.toString()); > c.output(KV.of(line, line)); > } > }) > ); > > > PCollection>> itPc = > totalPc.apply(fixWindow).apply( > "group by appKey", > GroupByKey.create() > ); > itPc.apply(ParDo.of(new DoFn>, Void>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV> keyIt = c.element(); > String key = keyIt.getKey(); > Iterable itb = keyIt.getValue(); > Iterator it = itb.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append(key).append(":["); > while(it.hasNext()){ > sb.append(it.next()).append(","); > } > String str = sb.toString(); > str = str.substring(0,str.length() -1) + "]"; > System.out.println(str); > String filePath = "/data/wyp/sparktest.txt"; > String line = "word-->["+key+"]total > count="+str+"--->time+"+c.timestamp().toString(); > System.out.println("writefile->"+line); > FileUtil.write(filePath, line, true, true); > } > > })); > > p.run().waitUn
[jira] [Commented] (BEAM-1775) fix issue of start_from_previous_offset in KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-1775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936210#comment-15936210 ] Amit Sela commented on BEAM-1775: - [~mingmxu] so you want this to be a "fallback" in case runners choose not to checkpoint ? I believe that runners shouldn't/can't make this choice and say they support reading from unbounded sources. > fix issue of start_from_previous_offset in KafkaIO > -- > > Key: BEAM-1775 > URL: https://issues.apache.org/jira/browse/BEAM-1775 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Xu Mingmin >Assignee: Xu Mingmin > > Jins George jins.geo...@aeris.net via aermail.onmicrosoft.com > > 5:50 PM (15 hours ago) > > to user > Hello, > I am writing a Beam pipeline(streaming) with Flink runner to consume data > from Kafka and apply some transformations and persist to Hbase. > If I restart the application ( due to failure/manual restart), consumer does > not resume from the offset where it was prior to restart. It always resume > from the latest offset. > If I enable Flink checkpionting with hdfs state back-end, system appears to > be resuming from the earliest offset > Is there a recommended way to resume from the offset where it was stopped ? > Thanks, > Jins George -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1765) Remove Aggregators from Spark runner
[ https://issues.apache.org/jira/browse/BEAM-1765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935311#comment-15935311 ] Amit Sela commented on BEAM-1765: - I'm wondering what's the timeline here ? last I checked direct and Spark runners were the only runners to support {{Metrics}}. > Remove Aggregators from Spark runner > > > Key: BEAM-1765 > URL: https://issues.apache.org/jira/browse/BEAM-1765 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Pablo Estrada >Assignee: Amit Sela > > I have started removing aggregators from the Java SDK, but runners use them > in different ways that I can't figure out well. This is to track the > independent effort in Spark. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1775) fix issue of start_from_previous_offset in KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-1775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935305#comment-15935305 ] Amit Sela commented on BEAM-1775: - I think this is might be a Flink runner specific issue (or just a missing/bad configuration), as I believe that both the Dataflow runner and the Spark runner will resume from the latest {{CheckpointMark}} upon restart. > fix issue of start_from_previous_offset in KafkaIO > -- > > Key: BEAM-1775 > URL: https://issues.apache.org/jira/browse/BEAM-1775 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Xu Mingmin >Assignee: Xu Mingmin > > Jins George jins.geo...@aeris.net via aermail.onmicrosoft.com > > 5:50 PM (15 hours ago) > > to user > Hello, > I am writing a Beam pipeline(streaming) with Flink runner to consume data > from Kafka and apply some transformations and persist to Hbase. > If I restart the application ( due to failure/manual restart), consumer does > not resume from the offset where it was prior to restart. It always resume > from the latest offset. > If I enable Flink checkpionting with hdfs state back-end, system appears to > be resuming from the earliest offset > Is there a recommended way to resume from the offset where it was stopped ? > Thanks, > Jins George -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-507) Fill in the documentation/runners/spark portion of the website
[ https://issues.apache.org/jira/browse/BEAM-507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15933238#comment-15933238 ] Amit Sela commented on BEAM-507: Yup, thanks for the GC ;) > Fill in the documentation/runners/spark portion of the website > -- > > Key: BEAM-507 > URL: https://issues.apache.org/jira/browse/BEAM-507 > Project: Beam > Issue Type: Bug > Components: website >Reporter: Frances Perry >Assignee: Amit Sela > Fix For: Not applicable > > > As per > https://docs.google.com/document/d/1-0jMv7NnYp0Ttt4voulUMwVe_qjBYeNMLm2LusYF3gQ/edit. > Should be a landing page for Spark-specific information. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-507) Fill in the documentation/runners/spark portion of the website
[ https://issues.apache.org/jira/browse/BEAM-507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-507. Resolution: Fixed Fix Version/s: Not applicable > Fill in the documentation/runners/spark portion of the website > -- > > Key: BEAM-507 > URL: https://issues.apache.org/jira/browse/BEAM-507 > Project: Beam > Issue Type: Bug > Components: website >Reporter: Frances Perry >Assignee: Amit Sela > Fix For: Not applicable > > > As per > https://docs.google.com/document/d/1-0jMv7NnYp0Ttt4voulUMwVe_qjBYeNMLm2LusYF3gQ/edit. > Should be a landing page for Spark-specific information. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931859#comment-15931859 ] Amit Sela commented on BEAM-1582: - Moved tests that use checkpoint recovery to post-commit. Keeping open since the problem is still there. > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1752) Tag Spark runner tests that recover from checkpoint.
[ https://issues.apache.org/jira/browse/BEAM-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1752. - Resolution: Fixed Fix Version/s: First stable release > Tag Spark runner tests that recover from checkpoint. > > > Key: BEAM-1752 > URL: https://issues.apache.org/jira/browse/BEAM-1752 > Project: Beam > Issue Type: Test > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > Add a tag to tag tests that validate using recovery from checkpoint. > This could help running such tests as part of a profile (e.g., PostCommit) > since they tend to flake because of Spark related issues. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1752) Tag Spark runner tests that recover from checkpoint.
Amit Sela created BEAM-1752: --- Summary: Tag Spark runner tests that recover from checkpoint. Key: BEAM-1752 URL: https://issues.apache.org/jira/browse/BEAM-1752 Project: Beam Issue Type: Test Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela Add a tag to tag tests that validate using recovery from checkpoint. This could help running such tests as part of a profile (e.g., PostCommit) since they tend to flake because of Spark related issues. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861 ] Amit Sela edited comment on BEAM-1582 at 3/17/17 10:21 PM: --- Could be related to SPARK-14701 and/or SPARK-14930 so that the {{CheckpointMark}} is not properly checkpointed. If for some reason the runtime environment was so slow it failed to start execution until timeout was hit, graceful stop would force to at least finish the first batch, and if this first batch included the read from Kafka on one hand, while failing to checkpoint the {{Reader}} mark on the other, resuming from checkpoint would read all the Kafka back log again causing the failures we see. I'll have a look at failed tests execution time to figure out if that seems to be the case, and if so I will simply move this test to post commit because This issue in Spark was only resolved for v2.0 was (Author: amitsela): Could be related to SPARK-14701 and/or SPARK-14930 so that the last {{CheckpointMark}} is not properly checkpointed. If for some reason the runtime environment was so slow it failed to start execution until timeout was hit, graceful stop would force to at least finish the first batch, and if this first batch included the read from Kafka on one hand, while failing to checkpoint the {{Reader}} mark on the other, resuming from checkpoint would read all the Kafka back log again causing the failures we see. I'll have a look at failed tests execution time to figure out if that seems to be the case, and if so I will simply move this test to post commit because This issue in Spark was only resolved for v2.0 > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861 ] Amit Sela edited comment on BEAM-1582 at 3/17/17 10:21 PM: --- Could be related to SPARK-14701 and/or SPARK-14930 so that the last {{CheckpointMark}} is not properly checkpointed. If for some reason the runtime environment was so slow it failed to start execution until timeout was hit, graceful stop would force to at least finish the first batch, and if this first batch included the read from Kafka on one hand, while failing to checkpoint the {{Reader}} mark on the other, resuming from checkpoint would read all the Kafka back log again causing the failures we see. I'll have a look at failed tests execution time to figure out if that seems to be the case, and if so I will simply move this test to post commit because This issue in Spark was only resolved for v2.0 was (Author: amitsela): Could be related to SPARK-14701 so that the last {{CheckpointMark}} is not properly checkpointed. If for some reason the runtime environment was so slow it failed to start execution until timeout was hit, graceful stop would force to at least finish the first batch, and if this first batch included the read from Kafka on one hand, while failing to checkpoint the {{Reader}} mark on the other, resuming from checkpoint would read all the Kafka back log again causing the failures we see. I'll have a look at failed tests execution time to figure out if that seems to be the case, and if so I will simply move this test to post commit because This issue in Spark was only resolved for v2.0 > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861 ] Amit Sela edited comment on BEAM-1582 at 3/17/17 10:19 PM: --- Could be related to SPARK-14701 so that the last {{CheckpointMark}} is not properly checkpointed. If for some reason the runtime environment was so slow it failed to start execution until timeout was hit, graceful stop would force to at least finish the first batch, and if this first batch included the read from Kafka on one hand, while failing to checkpoint the {{Reader}} mark on the other, resuming from checkpoint would read all the Kafka back log again causing the failures we see. I'll have a look at failed tests execution time to figure out if that seems to be the case, and if so I will simply move this test to post commit because This issue in Spark was only resolved for v2.0 was (Author: amitsela): Could be related to SPARK-16480 so that the last {{CheckpointMark}} is not properly checkpointed. If for some reason the runtime environment was so slow it failed to start execution until timeout was hit, graceful stop would force to at least finish the first batch, and if this first batch included the read from Kafka on one hand, while failing to checkpoint the {{Reader}} mark on the other, resuming from checkpoint would read all the Kafka back log again causing the failures we see. I'll have a look at failed tests execution time to figure out if that seems to be the case, and if so I will simply move this test to post commit because This issue in Spark was only resolved for v2.0 > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861 ] Amit Sela edited comment on BEAM-1582 at 3/17/17 10:14 PM: --- Could be related to SPARK-16480 so that the last {{CheckpointMark}} is not properly checkpointed. If for some reason the runtime environment was so slow it failed to start execution until timeout was hit, graceful stop would force to at least finish the first batch, and if this first batch included the read from Kafka on one hand, while failing to checkpoint the {{Reader}} mark on the other, resuming from checkpoint would read all the Kafka back log again causing the failures we see. I'll have a look at failed tests execution time to figure out if that seems to be the case, and if so I will simply move this test to post commit because This issue in Spark was only resolved for v2.0 was (Author: amitsela): Could be related to SPARK-16480 so that the last {{CheckpointMark}} is not properly checkpointed. > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once
[ https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924819#comment-15924819 ] Amit Sela commented on BEAM-1717: - Once this worked-out, I noticed a duplicate {{test}} jar in {{beam-sdks-java-core}}. Not sure what's the issue here, but running maven in debug mode shows that in the execution of {{maven-install-plugin}} the {{attachedArtifacts}} contain both {{org.apache.beam:beam-sdks-java-core:test-jar:tests}} and {{org.apache.beam:beam-sdks-java-core:jar:tests}} which seem to cause the duplication. While the {{default-test-jar}} execution id (jar plugin) calls the {{test-jar}} goal, I wonder if the {{jar}} goal also creates a {{tests}} jar. > Maven release/deploy tries to uploads some artifacts more than once > > > Key: BEAM-1717 > URL: https://issues.apache.org/jira/browse/BEAM-1717 > Project: Beam > Issue Type: Bug > Components: build-system >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > > Running maven {{release}} or {{deploy}} causes some artifacts to deploy more > than once which fails deployments to release Nexus. > While this is not an issue for the Apache release process (because it uses a > staging Nexus), this affects users who wish to deploy their own fork. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once
[ https://issues.apache.org/jira/browse/BEAM-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924812#comment-15924812 ] Amit Sela commented on BEAM-1717: - Seems that there is more than one issue here. First one is with {{beam-sdks-common-fn-api}} which produces the {{javadoc}} jar twice. This seemed to be resolved by changing the execution id of {{maven-javadoc-plugin}} in {{beam-parent.xml}} from {{javadoc}} to {{attach-javadocs}} like this: {code} ... release org.apache.maven.plugins maven-javadoc-plugin attach-javadocs package jar ... {code} > Maven release/deploy tries to uploads some artifacts more than once > > > Key: BEAM-1717 > URL: https://issues.apache.org/jira/browse/BEAM-1717 > Project: Beam > Issue Type: Bug > Components: build-system >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > > Running maven {{release}} or {{deploy}} causes some artifacts to deploy more > than once which fails deployments to release Nexus. > While this is not an issue for the Apache release process (because it uses a > staging Nexus), this affects users who wish to deploy their own fork. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1717) Maven release/deploy tries to uploads some artifacts more than once
Amit Sela created BEAM-1717: --- Summary: Maven release/deploy tries to uploads some artifacts more than once Key: BEAM-1717 URL: https://issues.apache.org/jira/browse/BEAM-1717 Project: Beam Issue Type: Bug Components: build-system Reporter: Amit Sela Assignee: Amit Sela Priority: Minor Running maven {{release}} or {{deploy}} causes some artifacts to deploy more than once which fails deployments to release Nexus. While this is not an issue for the Apache release process (because it uses a staging Nexus), this affects users who wish to deploy their own fork. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Reopened] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela reopened BEAM-1582: - This test keeps flaking so I'll leave it open until we resolve it, or move to PostCommit and accept the fact that it flakes at times. > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-797) A PipelineVisitor that creates a Spark-native pipeline.
[ https://issues.apache.org/jira/browse/BEAM-797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-797. Resolution: Fixed Fix Version/s: First stable release > A PipelineVisitor that creates a Spark-native pipeline. > > > Key: BEAM-797 > URL: https://issues.apache.org/jira/browse/BEAM-797 > Project: Beam > Issue Type: Wish > Components: runner-spark >Reporter: Amit Sela >Assignee: Aviem Zur >Priority: Minor > Fix For: First stable release > > > It could be very useful for debugging purposes to have a custom > PipelineVisitor that can tell what's the underlying Spark code that is being > called. > One idea: > This could be called with a flag in SparkPipelineOptions and instead of > executing the pipeline, it would print the underlying Spark DAG. > Clearly, DoFn internals would be obfuscated, but the Spark code could note > {{mapPartitions("ExtractWords")}} > Another difference would be Sources as they are a custom implementation for > Beam. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1562) Use a "signal" to stop streaming tests as they finish.
[ https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1562. - Resolution: Fixed Fix Version/s: First stable release > Use a "signal" to stop streaming tests as they finish. > -- > > Key: BEAM-1562 > URL: https://issues.apache.org/jira/browse/BEAM-1562 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > Streaming tests use a timeout that has to take a large enough buffer to avoid > slow runtimes stopping before test completes. > We can introduce a "poison pill" based on an {{EndOfStream}} element that > would be counted in a Metrics/Aggregator to know all data was processed. > Another option would be to follow the slowest WM and stop once it hits > end-of-time. > This requires the Spark runner to stop blocking the execution of a streaming > pipeline until it's complete - which is something relevant to > {{PipelineResult}} (re)design that is being discussed in BEAM-849 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1582. - Resolution: Fixed Fix Version/s: First stable release > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: First stable release > > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1636) UnboundedDataset action() does not materialize RDD
[ https://issues.apache.org/jira/browse/BEAM-1636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1636. - Resolution: Fixed Fix Version/s: 0.6.0 > UnboundedDataset action() does not materialize RDD > -- > > Key: BEAM-1636 > URL: https://issues.apache.org/jira/browse/BEAM-1636 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Aviem Zur >Assignee: Aviem Zur > Fix For: 0.6.0 > > > {{UnboundedDataset#action}} does not materialize DStream since it uses > {{register}}. Instead use {{foreachRDD}}...{{foreach}} which is an action, > which will materialize the RDDs of the DStream. > The reason it worked until now was because there was also a call to > {{cache()}} before it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1556) Spark executors need to register IO factories
[ https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1556. - Resolution: Fixed Fix Version/s: 0.6.0 > Spark executors need to register IO factories > - > > Key: BEAM-1556 > URL: https://issues.apache.org/jira/browse/BEAM-1556 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Frances Perry >Assignee: Amit Sela > Fix For: 0.6.0 > > > The Spark executors need to call IOChannelUtils.registerIOFactories(options) > in order to support GCS file and make the default WordCount example work. > Context in this thread: > https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1623) Transform Reshuffle directly in Spark runner
[ https://issues.apache.org/jira/browse/BEAM-1623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1623. - Resolution: Fixed Fix Version/s: 0.6.0 > Transform Reshuffle directly in Spark runner > > > Key: BEAM-1623 > URL: https://issues.apache.org/jira/browse/BEAM-1623 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Aviem Zur >Assignee: Aviem Zur > Fix For: 0.6.0 > > > Transform {{Reshuffle}} directly in Spark runner. > Spark's {{repartition}} is logically equivalent to {{Reshuffle}} and will > cause the required fusion break without incurring the overhead of > {{groupByKey}} (In streaming pipelines this will use {{updateStateByKey}}) > compared to a simple {{repartition}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1626) Remove caching of read MapWithStateDStream.
[ https://issues.apache.org/jira/browse/BEAM-1626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1626. - Resolution: Fixed Fix Version/s: 0.6.0 > Remove caching of read MapWithStateDStream. > --- > > Key: BEAM-1626 > URL: https://issues.apache.org/jira/browse/BEAM-1626 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: 0.6.0 > > > There's no real need for it since checkpointing caches as well, and from my > experiments I think it also has something to do with some of the flakes in > streaming tests. > Anyway, I don't see a good reason to call {{cache()}} there, so let's remove > it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1565) Update Spark runner PostCommit Jenkins job.
[ https://issues.apache.org/jira/browse/BEAM-1565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1565. - Resolution: Fixed Fix Version/s: 0.6.0 > Update Spark runner PostCommit Jenkins job. > --- > > Key: BEAM-1565 > URL: https://issues.apache.org/jira/browse/BEAM-1565 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Aviem Zur >Priority: Minor > Labels: beginner, low-hanging-fruit > Fix For: 0.6.0 > > > Should only activate {{runnable-on-service}} profile (and > {{streaming-runnable-on-service}} once enabled) and remove > {{-Dspark.port.maxRetries=64}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1556) Spark executors need to register IO factories
[ https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela reassigned BEAM-1556: --- Assignee: Amit Sela (was: Jean-Baptiste Onofré) > Spark executors need to register IO factories > - > > Key: BEAM-1556 > URL: https://issues.apache.org/jira/browse/BEAM-1556 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Frances Perry >Assignee: Amit Sela > > The Spark executors need to call IOChannelUtils.registerIOFactories(options) > in order to support GCS file and make the default WordCount example work. > Context in this thread: > https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1626) Remove caching of read MapWithStateDStream.
Amit Sela created BEAM-1626: --- Summary: Remove caching of read MapWithStateDStream. Key: BEAM-1626 URL: https://issues.apache.org/jira/browse/BEAM-1626 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela There's no real need for it since checkpointing caches as well, and from my experiments I think it also has something to do with some of the flakes in streaming tests. Anyway, I don't see a good reason to call {{cache()}} there, so let's remove it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1625) BoundedDataset action() does not materialize RDD
[ https://issues.apache.org/jira/browse/BEAM-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1625. - Resolution: Fixed Fix Version/s: 0.6.0 > BoundedDataset action() does not materialize RDD > > > Key: BEAM-1625 > URL: https://issues.apache.org/jira/browse/BEAM-1625 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Aviem Zur >Assignee: Aviem Zur > Fix For: 0.6.0 > > > {{BoundedDataset#action}} does not materialize RDD since it uses > {{foreachPartition}} which is not an action, instead use {{foreach}} which is. > See: http://spark.apache.org/docs/latest/programming-guide.html#actions -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1608) Add support for Spark cluster metrics to PKB
[ https://issues.apache.org/jira/browse/BEAM-1608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela reassigned BEAM-1608: --- Assignee: (was: Amit Sela) > Add support for Spark cluster metrics to PKB > > > Key: BEAM-1608 > URL: https://issues.apache.org/jira/browse/BEAM-1608 > Project: Beam > Issue Type: Bug > Components: runner-spark, testing >Reporter: Jason Kuster > > See > https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73#heading=h.exn0s6jsm24q > for more details on what this entails. > Blocked on BEAM-1602, adding support for Spark to PKB. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1602) Spark Runner support for PerfKit Benchmarker
[ https://issues.apache.org/jira/browse/BEAM-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela reassigned BEAM-1602: --- Assignee: (was: Amit Sela) > Spark Runner support for PerfKit Benchmarker > > > Key: BEAM-1602 > URL: https://issues.apache.org/jira/browse/BEAM-1602 > Project: Beam > Issue Type: Bug > Components: runner-spark, testing >Reporter: Jason Kuster > > See > https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73#heading=h.exn0s6jsm24q > for more details on what this entails. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1602) Spark Runner support for PerfKit Benchmarker
[ https://issues.apache.org/jira/browse/BEAM-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895863#comment-15895863 ] Amit Sela commented on BEAM-1602: - [~jasonkuster] what is Spark missing here ? running an IT against cluster ? why not use `spark-submit` ? or does it have to be programatically ? > Spark Runner support for PerfKit Benchmarker > > > Key: BEAM-1602 > URL: https://issues.apache.org/jira/browse/BEAM-1602 > Project: Beam > Issue Type: Bug > Components: runner-spark, testing >Reporter: Jason Kuster >Assignee: Amit Sela > > See > https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73#heading=h.exn0s6jsm24q > for more details on what this entails. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895861#comment-15895861 ] Amit Sela commented on BEAM-1582: - Could be related to SPARK-16480 so that the last {{CheckpointMark}} is not properly checkpointed. > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1591) Implement Combine optimizations for GABW in streaming.
Amit Sela created BEAM-1591: --- Summary: Implement Combine optimizations for GABW in streaming. Key: BEAM-1591 URL: https://issues.apache.org/jira/browse/BEAM-1591 Project: Beam Issue Type: Improvement Components: runner-spark Reporter: Amit Sela This should be straight-forward. Introduce {{AccumT}} generics in {{SparkGroupAlsoByWindowViaWindowSet}} and call with {{InputT}} for GBK and {{AccumT}} with Combine. Pass the proper {{SystemReduceFn}} instead of creating it in {{SparkGroupAlsoByWindowViaWindowSet}}. For combine, extract the output from the fired accumulated output. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-673) Data locality for Read.Bounded
[ https://issues.apache.org/jira/browse/BEAM-673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated BEAM-673: --- Description: In some distributed filesystems, such as HDFS, we should be able to hint to Spark the preferred locations of splits. Here is an example of how Spark does that for Hadoop RDDs: https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L249 was: In some distributed filesystems, such as HDFS, we should be able to hint to Spark the preferred locations of splits. Here is an example of how Spark does that for Hadoop RDDs: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L252 *Note: in case of 1-to-1 mapping of Read operation (e.g. TextIO) direct translation should still be preferred, but this is pending HDFS support for Beam anyway.* > Data locality for Read.Bounded > -- > > Key: BEAM-673 > URL: https://issues.apache.org/jira/browse/BEAM-673 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela > Fix For: First stable release > > > In some distributed filesystems, such as HDFS, we should be able to hint to > Spark the preferred locations of splits. > Here is an example of how Spark does that for Hadoop RDDs: > https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L249 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
[ https://issues.apache.org/jira/browse/BEAM-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891188#comment-15891188 ] Amit Sela commented on BEAM-1582: - Looks like the flake happens when the entire input is re-read. We inject 4 elements to Kafka before the first run, and 2 more before the second. When all is well, printing the number of elements read by SparkUnboundedSource's {{readUnboundedStream}} JavaDStream says 4 (sometimes 1 followed by 3) in the first run, and 2 in the second, but in failures, it reads 6 in the second. This would happen if the checkpoint of the readers are not persisted for some reason causing the KafkaIO to use the default "earliest" and so read everything. This happens even though checkpoint interval is batch interval. I will check if there's a way to guarantee/block on checkpointing. > ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. > -- > > Key: BEAM-1582 > URL: https://issues.apache.org/jira/browse/BEAM-1582 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > See: > https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ > After some digging in it appears that a second firing occurs (though only one > is expected) but it doesn't come from a stale state (state is empty before it > fires). > Might be a retry happening for some reason, which is OK in terms of > fault-tolerance guarantees (at-least-once), but not so much in terms of flaky > tests. > I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1582) ResumeFromCheckpointStreamingTest flakes with what appears as a second firing.
Amit Sela created BEAM-1582: --- Summary: ResumeFromCheckpointStreamingTest flakes with what appears as a second firing. Key: BEAM-1582 URL: https://issues.apache.org/jira/browse/BEAM-1582 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela See: https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/org.apache.beam$beam-runners-spark/2788/testReport/junit/org.apache.beam.runners.spark.translation.streaming/ResumeFromCheckpointStreamingTest/testWithResume/ After some digging in it appears that a second firing occurs (though only one is expected) but it doesn't come from a stale state (state is empty before it fires). Might be a retry happening for some reason, which is OK in terms of fault-tolerance guarantees (at-least-once), but not so much in terms of flaky tests. I'm looking into this hoping to fix this ASAP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-111) Use SDK implementation of WritableCoder
[ https://issues.apache.org/jira/browse/BEAM-111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-111. Resolution: Fixed Fix Version/s: 0.6.0 > Use SDK implementation of WritableCoder > --- > > Key: BEAM-111 > URL: https://issues.apache.org/jira/browse/BEAM-111 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Ismaël Mejía > Labels: easy, starter > Fix For: 0.6.0 > > > The Spark runner currently uses it's own implementation of WritableCoder, > should use the one in {{io-hdfs}}. > Remove {{org.apache.beam.runners.spark.coders.WritableCoder}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-849) Redesign PipelineResult API
[ https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890953#comment-15890953 ] Amit Sela edited comment on BEAM-849 at 3/1/17 8:14 PM: A continually growing file is something else, which I agree on, but in that case {{waitUntilFinish()}} would terminate after the file is done, right ? by moving the WM to end-of-time ? so if that happens implicitly, why call explicitly ? All I'm saying is that in batch, beginning and end are known ahead of execution, so blocking until termination is natural. In streaming however, the end is unknown, so it's a bit awkward - some pipelines will behave like batch, like SDF-log-tail, and some won't like reading from Pubsub/Kafka. I will agree that for the sake of a unified model it makes sense, but still a bit un-natural, so that's why I think this ticket is for - to try and reason about this and make "feel" more natural, no ? As for "unbounded pipelines" not being a part of the model, it's a bit confusing because it's all over the SDK. was (Author: amitsela): A continually growing file is something else, which I agree on, but in that case {{waitUntilFinish()}} would terminate after the file is done, right ? by moving the WM to end-of-time ? so if that happens implicitly, why call explicitly ? All I'm saying is that in batch, beginning and end are known ahead of execution, so blocking until termination is natural. In streaming however, the end is unknown, so it's a bit awkward - some pipelines will behave the same, like SDF-log-tail, and some won't like reading from Pubsub/Kafka. I will agree that for the sake of a unified model it makes sense, but still a bit un-natural, so that's why I think this ticket is for - to try and reason about this and make "feel" more natural, no ? As for "unbounded pipelines" not being a part of the model, it's a bit confusing because it's all over the SDK. > Redesign PipelineResult API > --- > > Key: BEAM-849 > URL: https://issues.apache.org/jira/browse/BEAM-849 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Pei He > > Current state: > Jira https://issues.apache.org/jira/browse/BEAM-443 addresses > waitUntilFinish() and cancel(). > However, there are additional work around PipelineResult: > need clearly defined contract and verification across all runners > need to revisit how to handle metrics/aggregators > need to be able to get logs -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-849) Redesign PipelineResult API
[ https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890953#comment-15890953 ] Amit Sela commented on BEAM-849: A continually growing file is something else, which I agree on, but in that case {{waitUntilFinish()}} would terminate after the file is done, right ? by moving the WM to end-of-time ? so if that happens implicitly, why call explicitly ? All I'm saying is that in batch, beginning and end are known ahead of execution, so blocking until termination is natural. In streaming however, the end is unknown, so it's a bit awkward - some pipelines will behave the same, like SDF-log-tail, and some won't like reading from Pubsub/Kafka. I will agree that for the sake of a unified model it makes sense, but still a bit un-natural, so that's why I think this ticket is for - to try and reason about this and make "feel" more natural, no ? As for "unbounded pipelines" not being a part of the model, it's a bit confusing because it's all over the SDK. > Redesign PipelineResult API > --- > > Key: BEAM-849 > URL: https://issues.apache.org/jira/browse/BEAM-849 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Pei He > > Current state: > Jira https://issues.apache.org/jira/browse/BEAM-443 addresses > waitUntilFinish() and cancel(). > However, there are additional work around PipelineResult: > need clearly defined contract and verification across all runners > need to revisit how to handle metrics/aggregators > need to be able to get logs -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-849) Redesign PipelineResult API
[ https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890885#comment-15890885 ] Amit Sela edited comment on BEAM-849 at 3/1/17 7:35 PM: I disagree with stating that "Create.of(filename) + ParDo(tail file) + ParDo(process records)" in streaming leverages "low-latency", that's very runner specific, and generally inaccurate - how is sending a "filename" to worker/s + data locality slower than streaming the file to processors ? was (Author: amitsela): I disagree with stating that "Create.of(filename) + ParDo(tail file) + ParDo(process records)" in streaming leverages "low-latency", that's very runner specific, and generally inaccurate - how is sending a "filename" to worker/s + data locality faster than streaming the file to processors ? > Redesign PipelineResult API > --- > > Key: BEAM-849 > URL: https://issues.apache.org/jira/browse/BEAM-849 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Pei He > > Current state: > Jira https://issues.apache.org/jira/browse/BEAM-443 addresses > waitUntilFinish() and cancel(). > However, there are additional work around PipelineResult: > need clearly defined contract and verification across all runners > need to revisit how to handle metrics/aggregators > need to be able to get logs -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1562) Use a "signal" to stop streaming tests as they finish.
[ https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890889#comment-15890889 ] Amit Sela commented on BEAM-1562: - I am using Watermarks ;-) but I persisted things to give all the information. > Use a "signal" to stop streaming tests as they finish. > -- > > Key: BEAM-1562 > URL: https://issues.apache.org/jira/browse/BEAM-1562 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > Streaming tests use a timeout that has to take a large enough buffer to avoid > slow runtimes stopping before test completes. > We can introduce a "poison pill" based on an {{EndOfStream}} element that > would be counted in a Metrics/Aggregator to know all data was processed. > Another option would be to follow the slowest WM and stop once it hits > end-of-time. > This requires the Spark runner to stop blocking the execution of a streaming > pipeline until it's complete - which is something relevant to > {{PipelineResult}} (re)design that is being discussed in BEAM-849 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-849) Redesign PipelineResult API
[ https://issues.apache.org/jira/browse/BEAM-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890885#comment-15890885 ] Amit Sela commented on BEAM-849: I disagree with stating that "Create.of(filename) + ParDo(tail file) + ParDo(process records)" in streaming leverages "low-latency", that's very runner specific, and generally inaccurate - how is sending a "filename" to worker/s + data locality faster than streaming the file to processors ? > Redesign PipelineResult API > --- > > Key: BEAM-849 > URL: https://issues.apache.org/jira/browse/BEAM-849 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Pei He > > Current state: > Jira https://issues.apache.org/jira/browse/BEAM-443 addresses > waitUntilFinish() and cancel(). > However, there are additional work around PipelineResult: > need clearly defined contract and verification across all runners > need to revisit how to handle metrics/aggregators > need to be able to get logs -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1562) Use a "signal" to stop streaming tests as they finish.
[ https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated BEAM-1562: Description: Streaming tests use a timeout that has to take a large enough buffer to avoid slow runtimes stopping before test completes. We can introduce a "poison pill" based on an {{EndOfStream}} element that would be counted in a Metrics/Aggregator to know all data was processed. Another option would be to follow the slowest WM and stop once it hits end-of-time. This requires the Spark runner to stop blocking the execution of a streaming pipeline until it's complete - which is something relevant to {{PipelineResult}} (re)design that is being discussed in BEAM-849 was: Streaming tests use a timeout that has to take a large enough buffer to avoid slow runtimes stopping before test completes. We can introduce a "poison pill" based on an {{EndOfStream}} element that would be counted in a Metrics/Aggregator to know all data was processed. This requires the Spark runner to stop blocking the execution of a streaming pipeline until it's complete - which is something relevant to {{PipelineResult}} (re)design that is being discussed in BEAM-849 > Use a "signal" to stop streaming tests as they finish. > -- > > Key: BEAM-1562 > URL: https://issues.apache.org/jira/browse/BEAM-1562 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > Streaming tests use a timeout that has to take a large enough buffer to avoid > slow runtimes stopping before test completes. > We can introduce a "poison pill" based on an {{EndOfStream}} element that > would be counted in a Metrics/Aggregator to know all data was processed. > Another option would be to follow the slowest WM and stop once it hits > end-of-time. > This requires the Spark runner to stop blocking the execution of a streaming > pipeline until it's complete - which is something relevant to > {{PipelineResult}} (re)design that is being discussed in BEAM-849 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-1562) Use a "signal" to stop streaming tests as they finish.
[ https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated BEAM-1562: Summary: Use a "signal" to stop streaming tests as they finish. (was: Use a "poison pill" to stop streaming tests as they finish.) > Use a "signal" to stop streaming tests as they finish. > -- > > Key: BEAM-1562 > URL: https://issues.apache.org/jira/browse/BEAM-1562 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > Streaming tests use a timeout that has to take a large enough buffer to avoid > slow runtimes stopping before test completes. > We can introduce a "poison pill" based on an {{EndOfStream}} element that > would be counted in a Metrics/Aggregator to know all data was processed. > This requires the Spark runner to stop blocking the execution of a streaming > pipeline until it's complete - which is something relevant to > {{PipelineResult}} (re)design that is being discussed in BEAM-849 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (BEAM-1562) Use a "poison pill" to stop streaming tests as they finish.
[ https://issues.apache.org/jira/browse/BEAM-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela reassigned BEAM-1562: --- Assignee: Amit Sela > Use a "poison pill" to stop streaming tests as they finish. > --- > > Key: BEAM-1562 > URL: https://issues.apache.org/jira/browse/BEAM-1562 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > Streaming tests use a timeout that has to take a large enough buffer to avoid > slow runtimes stopping before test completes. > We can introduce a "poison pill" based on an {{EndOfStream}} element that > would be counted in a Metrics/Aggregator to know all data was processed. > This requires the Spark runner to stop blocking the execution of a streaming > pipeline until it's complete - which is something relevant to > {{PipelineResult}} (re)design that is being discussed in BEAM-849 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1576) Use UnsupportedSideInputReader in GroupAlsoByWindowEvaluatorFactory.
[ https://issues.apache.org/jira/browse/BEAM-1576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1576. - Resolution: Fixed Fix Version/s: 0.6.0 > Use UnsupportedSideInputReader in GroupAlsoByWindowEvaluatorFactory. > > > Key: BEAM-1576 > URL: https://issues.apache.org/jira/browse/BEAM-1576 > Project: Beam > Issue Type: Bug > Components: runner-direct >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > Fix For: 0.6.0 > > > Now that {{runners-core}} has an {{UnsupportedSideInputReader}} we can use it > in Direct runner too. > I didn't see any other refactors that I could do around this across the > project so it's tagged on the direct runner only. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1576) Use UnsupportedSideInputReader in GroupAlsoByWindowEvaluatorFactory.
Amit Sela created BEAM-1576: --- Summary: Use UnsupportedSideInputReader in GroupAlsoByWindowEvaluatorFactory. Key: BEAM-1576 URL: https://issues.apache.org/jira/browse/BEAM-1576 Project: Beam Issue Type: Bug Components: runner-direct Reporter: Amit Sela Assignee: Amit Sela Priority: Minor Now that {{runners-core}} has an {{UnsupportedSideInputReader}} we can use it in Direct runner too. I didn't see any other refactors that I could do around this across the project so it's tagged on the direct runner only. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-920) Support triggers, panes and watermarks.
[ https://issues.apache.org/jira/browse/BEAM-920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-920. Resolution: Fixed Fix Version/s: 0.6.0 > Support triggers, panes and watermarks. > --- > > Key: BEAM-920 > URL: https://issues.apache.org/jira/browse/BEAM-920 > Project: Beam > Issue Type: New Feature > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: 0.6.0 > > > Implement event-time based aggregation using triggers, panes and watermarks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1556) Spark executors need to register IO factories
[ https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888512#comment-15888512 ] Amit Sela commented on BEAM-1556: - Agree, unblock users first. [~jbonofre] I can help with this if you want, ping me. > Spark executors need to register IO factories > - > > Key: BEAM-1556 > URL: https://issues.apache.org/jira/browse/BEAM-1556 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Frances Perry >Assignee: Jean-Baptiste Onofré > > The Spark executors need to call IOChannelUtils.registerIOFactories(options) > in order to support GCS file and make the default WordCount example work. > Context in this thread: > https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1556) Spark executors need to register IO factories
[ https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887964#comment-15887964 ] Amit Sela commented on BEAM-1556: - My line of thought about this being in the SDK (or better, the Runner API) is because the runner would have to init. the registration for every instance, workers mostly (the implementation of {{PipelineRunner}} would probably take care of it for the "Driver" instance). Since not all {{DoFn}} require this, and not all readers/writes.. so it's either init. all the time (regardless if needed or not) or the runner would have to patch-up for every new use case: Read, Write, DoFn... I'm not sure I'm going to like the following suggestion (fighting with myself a bit here), but how about a {{FileSystemContext}} ? and the runner would have to initialize in it and pass it on to the SDK ? Not sure here.. thoughts ? > Spark executors need to register IO factories > - > > Key: BEAM-1556 > URL: https://issues.apache.org/jira/browse/BEAM-1556 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Frances Perry >Assignee: Jean-Baptiste Onofré > > The Spark executors need to call IOChannelUtils.registerIOFactories(options) > in order to support GCS file and make the default WordCount example work. > Context in this thread: > https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1565) Update Spark runner PostCommit Jenkins job.
Amit Sela created BEAM-1565: --- Summary: Update Spark runner PostCommit Jenkins job. Key: BEAM-1565 URL: https://issues.apache.org/jira/browse/BEAM-1565 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Amit Sela Priority: Minor Should only activate {{runnable-on-service}} profile (and {{streaming-runnable-on-service}} once enabled) and remove {{-Dspark.port.maxRetries=64}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1562) Use a "poison pill" to stop streaming tests as they finish.
Amit Sela created BEAM-1562: --- Summary: Use a "poison pill" to stop streaming tests as they finish. Key: BEAM-1562 URL: https://issues.apache.org/jira/browse/BEAM-1562 Project: Beam Issue Type: Improvement Components: runner-spark Reporter: Amit Sela Streaming tests use a timeout that has to take a large enough buffer to avoid slow runtimes stopping before test completes. We can introduce a "poison pill" based on an {{EndOfStream}} element that would be counted in a Metrics/Aggregator to know all data was processed. This requires the Spark runner to stop blocking the execution of a streaming pipeline until it's complete - which is something relevant to {{PipelineResult}} (re)design that is being discussed in BEAM-849 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1564) Support streaming side-inputs in the Spark runner.
Amit Sela created BEAM-1564: --- Summary: Support streaming side-inputs in the Spark runner. Key: BEAM-1564 URL: https://issues.apache.org/jira/browse/BEAM-1564 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela Currently an unbounded PCollection can use side-inputs from a bounded PCollection but there's no proper support for streaming sources that update the side-input. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1563) Flatten Spark runner libraries.
Amit Sela created BEAM-1563: --- Summary: Flatten Spark runner libraries. Key: BEAM-1563 URL: https://issues.apache.org/jira/browse/BEAM-1563 Project: Beam Issue Type: Improvement Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela Flatten Spark runner libraries into a single package (or two) so that everything is private. See [~kenn] comment: https://github.com/apache/beam/pull/2050 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1556) Spark executors need to register IO factories
[ https://issues.apache.org/jira/browse/BEAM-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15884602#comment-15884602 ] Amit Sela commented on BEAM-1556: - [~frances] I wonder if this is an issue with other runners as well.. since I don't see any runner (apart from {{DataflowRunner}}) calling {{IOChannelUtils}} in their code. As for the Spark runner, looks like {{SourceRDD.Bounded}} needs to initialize IO factory registration on the workers, using deserialized PipelineOptions as Frances mentioned. I wouldn't do this as part of {{SparkRuntimeContext#deserializePipelineOptions(String)}} since it is only used when reading from a FileBasedSource (writing to Sink). I wonder if this intialization shouldn't be a part {{FileBasedReader}} construction ? > Spark executors need to register IO factories > - > > Key: BEAM-1556 > URL: https://issues.apache.org/jira/browse/BEAM-1556 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Frances Perry >Assignee: Jean-Baptiste Onofré > > The Spark executors need to call IOChannelUtils.registerIOFactories(options) > in order to support GCS file and make the default WordCount example work. > Context in this thread: > https://lists.apache.org/thread.html/469a139c9eb07e64e514cdea42ab8000678ab743794a090c365205d7@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-982) Document spark configuration update
[ https://issues.apache.org/jira/browse/BEAM-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883673#comment-15883673 ] Amit Sela commented on BEAM-982: [~jbonofre] {{spark.serializer}} is already hard-coded into the runner, and takes highest precedence. I don't believe {{spark.kryoserializer.buffer.max}} was related specifically to your problem. > Document spark configuration update > --- > > Key: BEAM-982 > URL: https://issues.apache.org/jira/browse/BEAM-982 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Jean-Baptiste Onofré >Priority: Minor > > When using {{spark-submit}} to submit a pipeline to a spark cluster, I had to > change the {{conf/spark-defaults.conf}} with the following: > {code} > spark.serializer org.apache.spark.serializer.KryoSerializer > spark.kryoserializer.buffer.max 1024 > {code} > I think it would make sense to add a note in the Spark README or website > about this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (BEAM-982) Document spark configuration update
[ https://issues.apache.org/jira/browse/BEAM-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883673#comment-15883673 ] Amit Sela edited comment on BEAM-982 at 2/24/17 10:49 PM: -- [~jbonofre] {{spark.serializer}} is already hard-coded into the runner, and takes highest precedence. I don't believe {{spark.kryoserializer.buffer.max}} was related specifically to your problem. Any reason not to close this ? was (Author: amitsela): [~jbonofre] {{spark.serializer}} is already hard-coded into the runner, and takes highest precedence. I don't believe {{spark.kryoserializer.buffer.max}} was related specifically to your problem. > Document spark configuration update > --- > > Key: BEAM-982 > URL: https://issues.apache.org/jira/browse/BEAM-982 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Jean-Baptiste Onofré >Priority: Minor > > When using {{spark-submit}} to submit a pipeline to a spark cluster, I had to > change the {{conf/spark-defaults.conf}} with the following: > {code} > spark.serializer org.apache.spark.serializer.KryoSerializer > spark.kryoserializer.buffer.max 1024 > {code} > I think it would make sense to add a note in the Spark README or website > about this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (BEAM-229) Add support for additional Spark configuration via PipelineOptions
[ https://issues.apache.org/jira/browse/BEAM-229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela closed BEAM-229. -- Resolution: Won't Fix Fix Version/s: Not applicable Spark configurations can be passed as system properties or via Spark configuration as part of the deployment / spark-default.conf. I don't see a reason to mix it into {{PipelineOptions}}. > Add support for additional Spark configuration via PipelineOptions > -- > > Key: BEAM-229 > URL: https://issues.apache.org/jira/browse/BEAM-229 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela > Fix For: Not applicable > > > Provide support for at least some of the Spark configurations via > PipelineOptions - memory, cores, etc. > Need to pass PipelineOptions to > https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java > where the SparkConf is created. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-981) Not possible to directly submit a pipeline on spark cluster
[ https://issues.apache.org/jira/browse/BEAM-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883644#comment-15883644 ] Amit Sela commented on BEAM-981: [~jbonofre] any news on this one ? > Not possible to directly submit a pipeline on spark cluster > --- > > Key: BEAM-981 > URL: https://issues.apache.org/jira/browse/BEAM-981 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 0.3.0-incubating, 0.4.0 >Reporter: Jean-Baptiste Onofré > > It's not possible to directly run a pipeline on the spark runner (for > instance using {{mvn exec:java}}. It fails with: > {code} > [appclient-register-master-threadpool-0] INFO > org.apache.spark.deploy.client.AppClient$ClientEndpoint - Connecting to > master spark://10.200.118.197:7077... > [shuffle-client-0] ERROR org.apache.spark.network.client.TransportClient - > Failed to send RPC 6813731522650020739 to /10.200.118.197:7077: > java.lang.AbstractMethodError: > org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted; > java.lang.AbstractMethodError: > org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted; > at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73) > at > io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:820) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733) > at > io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:826) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733) > at > io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740) > at > io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1101) > at > io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1090) > at > io.netty.util.concurrent.SingleThreadEventExecutor.safeExecute(SingleThreadEventExecutor.java:451) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) > at java.lang.Thread.run(Thread.java:745) > [appclient-register-master-threadpool-0] WARN > org.apache.spark.deploy.client.AppClient$ClientEndpoint - Failed to connect > to master 10.200.118.197:7077 > java.io.IOException: Failed to send RPC 6813731522650020739 to > /10.200.118.197:7077: java.lang.AbstractMethodError: > org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted; > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239) > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226) > at > io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:507) > at > io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:486) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427) > at > io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:129) > at > io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:845) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:750) >
[jira] [Closed] (BEAM-18) Add support for new Beam Sink API
[ https://issues.apache.org/jira/browse/BEAM-18?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela closed BEAM-18. - Resolution: Won't Fix Fix Version/s: Not applicable Currently, Beam Sinks are implemented via {{ParDo}}s and the Spark runner will trigger an action-less leaf so I don't see a reason to keep this ticket. If anything changes in the Sink API this could be revisited. > Add support for new Beam Sink API > - > > Key: BEAM-18 > URL: https://issues.apache.org/jira/browse/BEAM-18 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela > Fix For: Not applicable > > > Support Beam Sinks via Spark {{foreach}} API, Spark's native sinks (if / > where) possible. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (BEAM-668) Reinstate runner direct translation for TextIO and AvroIO once Beam SDK supports hdfs.
[ https://issues.apache.org/jira/browse/BEAM-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela closed BEAM-668. -- Resolution: Won't Fix Fix Version/s: Not applicable It doesn't seem maintainable to directly translate IOs in general because Beam and Spark are not in synch. > Reinstate runner direct translation for TextIO and AvroIO once Beam SDK > supports hdfs. > -- > > Key: BEAM-668 > URL: https://issues.apache.org/jira/browse/BEAM-668 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > Fix For: Not applicable > > > The runner has an implementation of a direct translation (not with > Read.Bounded) for TextIO and AvroIO. > Those cannot be used properly until BEAM-59 is resolved and hdfs is properly > supported. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (BEAM-848) A better shuffle after reading from within mapWithState.
[ https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated BEAM-848: --- Issue Type: Improvement (was: Bug) > A better shuffle after reading from within mapWithState. > > > Key: BEAM-848 > URL: https://issues.apache.org/jira/browse/BEAM-848 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and > this stateful operation will be followed by a shuffle: > https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159 > Since the stateful read maps "splitSource" -> "partition of a list of read > values", the following shuffle won't benefit in any way (the list of read > values has not been flatMapped yet). In order to avoid shuffle we need to set > the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default > {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and > will skip shuffle if the partitioners match. > It would be wise to shuffle the read values _after_ flatmap. > I will break this into two tasks: > # Set default-partitioner to the input RDD. > # Shuffle (using Coders) the input. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1526) Flakes in Spark runner WatermarkTest.testInDoFn
[ https://issues.apache.org/jira/browse/BEAM-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1526. - Resolution: Fixed Fix Version/s: 0.6.0 > Flakes in Spark runner WatermarkTest.testInDoFn > --- > > Key: BEAM-1526 > URL: https://issues.apache.org/jira/browse/BEAM-1526 > Project: Beam > Issue Type: Bug > Components: runner-spark >Reporter: Kenneth Knowles >Assignee: Thomas Groh > Fix For: 0.6.0 > > > https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7612/org.apache.beam$beam-runners-spark/testReport/junit/org.apache.beam.runners.spark/WatermarkTest/testInDoFn/ > We should probably add {{@Ignore}} to the test for now, and remove it after > the next PR changes everything, and so on. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1512) Optimize leaf transforms materialization
[ https://issues.apache.org/jira/browse/BEAM-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1512. - Resolution: Fixed Fix Version/s: 0.6.0 > Optimize leaf transforms materialization > > > Key: BEAM-1512 > URL: https://issues.apache.org/jira/browse/BEAM-1512 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Aviem Zur >Assignee: Aviem Zur > Fix For: 0.6.0 > > > Optimize leaf materialization in {{EvaluationContext}} Use register for > DStream leaves and an empty {{foreachPartition}} for other leaves instead of > the current {{count()}} which adds overhead. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-1492) Avoid potential issue in ASM 5.0
[ https://issues.apache.org/jira/browse/BEAM-1492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872065#comment-15872065 ] Amit Sela commented on BEAM-1492: - [~kenn] Spark and Apex runners both use Kryo 2.x which transitively relies on ASM 4.0. Gearpump uses {{akka-kryo-serialization}} which depends on Kryo 3.x and so transitively relies on ASM 5.x. This could be a problem for Spark runner V2 once this happens though since branch 2.1 of Spark depends on Kryo 3.0.3 I'll keep an eye! > Avoid potential issue in ASM 5.0 > > > Key: BEAM-1492 > URL: https://issues.apache.org/jira/browse/BEAM-1492 > Project: Beam > Issue Type: Task > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Amit Sela > > There is a suspected bug in asm 5.0 that is considered the likely root cause > of a [bug > sbt-assembly|https://github.com/sbt/sbt-assembly/issues/205#issuecomment-279964607] > that carried over to [GEARPUMP-236]. I have not found a direct reference to > what the issue is, precisely, but the dependency effect of this is extremely > small and these are libraries that are useful to keep current. And if/when > Gearpump runner lands on master this will avoid any diamond dep issues. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-774) Implement Metrics support for Spark runner
[ https://issues.apache.org/jira/browse/BEAM-774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-774. Resolution: Fixed Fix Version/s: 0.6.0 > Implement Metrics support for Spark runner > -- > > Key: BEAM-774 > URL: https://issues.apache.org/jira/browse/BEAM-774 > Project: Beam > Issue Type: Sub-task > Components: runner-spark >Reporter: Ben Chambers >Assignee: Aviem Zur > Fix For: 0.6.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (BEAM-1470) A quite logger for testing
[ https://issues.apache.org/jira/browse/BEAM-1470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela resolved BEAM-1470. - Resolution: Fixed Fix Version/s: 0.6.0 > A quite logger for testing > -- > > Key: BEAM-1470 > URL: https://issues.apache.org/jira/browse/BEAM-1470 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela >Priority: Minor > Fix For: 0.6.0 > > > Make test logger quite and clean to avoid flooding build logs and to allows > better visibility in case something breaks since current logging is very > crowded. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (BEAM-1470) A quite logger for testing
Amit Sela created BEAM-1470: --- Summary: A quite logger for testing Key: BEAM-1470 URL: https://issues.apache.org/jira/browse/BEAM-1470 Project: Beam Issue Type: Improvement Components: runner-spark Reporter: Amit Sela Assignee: Amit Sela Priority: Minor Make test logger quite and clean to avoid flooding build logs and to allows better visibility in case something breaks since current logging is very crowded. -- This message was sent by Atlassian JIRA (v6.3.15#6346)