Hi, I am implementing WordCount example from org.apache.beam.runners.spark.examples using beam-release-0.6.0.
I have changes to this program, and I trying to execute it on machine across LAN, am able to create pipeline and read the read them but when I apply .apply(new CountWords()) it gives me this exception : 17/05/30 23:56:22 INFO SparkRunner: Executing pipeline using the SparkRunner. 17/05/30 23:56:22 INFO SparkContextFactory: Using a provided Spark Context 17/05/30 23:56:23 INFO SparkRunner$Evaluator: Evaluating Read(CompressedSource) 17/05/30 23:56:23 INFO SparkRunner$Evaluator: Evaluating ParDo(ExtractWords) 17/05/30 23:56:23 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (192.168.1.1:53428) with ID 1 17/05/30 23:56:23 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.1:35482 with 366.3 MB RAM, BlockManagerId(1, 192.168.1.214, 35482, None) Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.RuntimeException: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102) at com.impressico.apache.beam.word.count.vs.WordCountVS.main(WordCountVS.java:177) ... 6 more Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:163) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792) at org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:155) at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:45) at org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:348) at org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:328) at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:374) at org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:360) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:231) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321) at org.apache.beam.runners.spark.SparkRunner$2.run(SparkRunner.java:226) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I have been trying to explore this exception, there's not much help for the same, sharing my sample code. public class WordCount implements JavaStreamingContextFactory{ private static final Logger logger = LoggerFactory.getLogger(WordCount.class); public static String path = "/home/username/kinglear.txt"; public static String username = "username"; public static String sparkMaster = "spark://192.168.1.1:7077"; public static String appName = "SparkRunnerFileRead"; public static String pathToWrite = "/home/username/beamWorks/output.txt"; public static LongAccumulator longAccumulator ; static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", Sum.ofLongs()); @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); } // Split the line into words. String[] words = c.element().split("[^a-zA-Z']+"); // Output each word encountered into the output PCollection. for (String word : words) { if (!word.isEmpty()) { c.output(word); } } } } public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { @Override public String apply(KV<String, Long> input) { logger.info("return input.getKey() + : + input.getValue()"); return input.toString(); } } public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); longAccumulator.add(wordCounts.expand().size()); return wordCounts; } } public interface WordCountOptions extends SparkContextOptions { @Description("Path of the file to read from") @Default.String("/home/username/kinglear.txt") String getInputFile(); void setInputFile(String value); @Description("Path of the file to write to") @Default.String("/home/username/beamWorks/output.txt") String getOutput(); void setOutput(String value); } public static void main(String[] args) { WordCount wc = new WordCount(); System.setProperty("username", username); WordCountOptions sparkContextOptions = PipelineOptionsFactory.as(WordCountOptions.class); sparkContextOptions.setSparkMaster(sparkMaster); sparkContextOptions.setAppName("BeamReadOperations"); sparkContextOptions.setStreaming(false); sparkContextOptions.setRunner(SparkRunner.class); sparkContextOptions.setEnableSparkMetricSinks(true); sparkContextOptions.setJobName("BeamReadOperations Job"); sparkContextOptions.as(SparkPipelineOptions.class); sparkContextOptions.setCheckpointDurationMillis(1000L); sparkContextOptions.setMaxRecordsPerBatch(1000L); sparkContextOptions.setOptionsId(1000L); sparkContextOptions.setProvidedSparkContext(wc.create().sparkContext()); sparkContextOptions.setUsesProvidedSparkContext(true); Pipeline p = Pipeline.create(sparkContextOptions); p.apply("ReadLines", TextIO.Read.from(sparkContextOptions.getInputFile())) .apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())) .apply("WriteCounts", TextIO.Write.to(sparkContextOptions.getOutput())); p.run().waitUntilFinish(); } public JavaStreamingContext create() { return createJavaStreamingContext(); } private static JavaSparkContext createJavaSparkContext() { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(appName); sparkConf.setMaster(sparkMaster); sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer"); JavaSerializer javaSerializer = new JavaSerializer(sparkConf); javaSerializer.newInstance(); javaSerializer.org$apache$spark$serializer$JavaSerializer$$counterReset(); SparkContext sparkContext = new SparkContext(sparkMaster, appName, sparkConf); longAccumulator = sparkContext.longAccumulator(); longAccumulator.setValue(100L); JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkContext); return javaSparkContext; } private JavaStreamingContext createJavaStreamingContext() { JavaStreamingContext javaStreamingContext = new JavaStreamingContext(createJavaSparkContext(), new Duration(10000L)); return javaStreamingContext; } } just for info : i am using these dependencies <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark</artifactId> <version>0.6.0</version> </dependency> <!-- <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hdfs</artifactId> <version>0.6.0</version> </dependency> --> <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-core --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>0.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.3</version> </dependency> and the spark-submit command I use : spark-submit --class com.impressico.apache.beam.word.count.vs.WordCount --master spark://192.168.1.1:6066 --deploy-mode cluster /home/username/beamWorks/WordCount.jar --runner=SparkRunner --name WordCount any inputs will be very helpful for me. Thanks, Vaibhav Sharma
package com.impressico.apache.beam.word.count.vs; import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.serializer.JavaSerializer; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class WordCount implements JavaStreamingContextFactory{ private static final Logger logger = LoggerFactory.getLogger(WordCount.class); public static String path = "/home/username/kinglear.txt"; public static String username = "username"; public static String sparkMaster = "spark://192.168.1.1:7077"; public static String appName = "SparkRunnerHdfsFileRead"; public static String pathToWrite = "/home/username/beamWorks/output.txt"; public static LongAccumulator longAccumulator ; static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", Sum.ofLongs()); @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); } // Split the line into words. String[] words = c.element().split("[^a-zA-Z']+"); // Output each word encountered into the output PCollection. for (String word : words) { if (!word.isEmpty()) { c.output(word); } } } } public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { @Override public String apply(KV<String, Long> input) { logger.info("return input.getKey() + : + input.getValue()"); return input.toString(); } } public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); longAccumulator.add(wordCounts.expand().size()); return wordCounts; } } public interface WordCountOptions extends SparkContextOptions { @Description("Path of the file to read from") @Default.String("/home/username/kinglear.txt") String getInputFile(); void setInputFile(String value); @Description("Path of the file to write to") @Default.String("/home/username/beamWorks/output.txt") String getOutput(); void setOutput(String value); } public static void main(String[] args) { WordCount wc = new WordCount(); System.setProperty("username", username); WordCountOptions sparkContextOptions = PipelineOptionsFactory.as(WordCountOptions.class); sparkContextOptions.setSparkMaster(sparkMaster); sparkContextOptions.setAppName("BeamReadOperations"); sparkContextOptions.setStreaming(false); sparkContextOptions.setRunner(SparkRunner.class); sparkContextOptions.setEnableSparkMetricSinks(true); sparkContextOptions.setJobName("BeamReadOperations Job"); sparkContextOptions.as(SparkPipelineOptions.class); sparkContextOptions.setCheckpointDurationMillis(1000L); sparkContextOptions.setMaxRecordsPerBatch(1000L); sparkContextOptions.setOptionsId(1000L); sparkContextOptions.setProvidedSparkContext(wc.create().sparkContext()); sparkContextOptions.setUsesProvidedSparkContext(true); Pipeline p = Pipeline.create(sparkContextOptions); p.apply("ReadLines", TextIO.Read.from(sparkContextOptions.getInputFile())) .apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())) .apply("WriteCounts", TextIO.Write.to(sparkContextOptions.getOutput())); p.run().waitUntilFinish(); } public JavaStreamingContext create() { return createJavaStreamingContext(); } private static JavaSparkContext createJavaSparkContext() { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName(appName); sparkConf.setMaster(sparkMaster); sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer"); JavaSerializer javaSerializer = new JavaSerializer(sparkConf); javaSerializer.newInstance(); javaSerializer.org$apache$spark$serializer$JavaSerializer$$counterReset(); SparkContext sparkContext = new SparkContext(sparkMaster, appName, sparkConf); longAccumulator = sparkContext.longAccumulator(); longAccumulator.setValue(100L); JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkContext); return javaSparkContext; } private JavaStreamingContext createJavaStreamingContext() { JavaStreamingContext javaStreamingContext = new JavaStreamingContext(createJavaSparkContext(), new Duration(10000L)); return javaStreamingContext; } }