Hi,

I am implementing WordCount example
from org.apache.beam.runners.spark.examples using beam-release-0.6.0.

I have changes to this program, and I trying to execute it on machine
across LAN,
am able to create pipeline and read the read them but when I apply
.apply(new CountWords())

it gives me this exception :

17/05/30 23:56:22 INFO SparkRunner: Executing pipeline using the SparkRunner.
17/05/30 23:56:22 INFO SparkContextFactory: Using a provided Spark Context
17/05/30 23:56:23 INFO SparkRunner$Evaluator: Evaluating Read(CompressedSource)
17/05/30 23:56:23 INFO SparkRunner$Evaluator: Evaluating ParDo(ExtractWords)
17/05/30 23:56:23 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
Registered executor NettyRpcEndpointRef(null) (192.168.1.1:53428) with
ID 1
17/05/30 23:56:23 INFO BlockManagerMasterEndpoint: Registering block
manager 192.168.1.1:35482 with 366.3 MB RAM, BlockManagerId(1,
192.168.1.214, 35482, None)
Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
        at 
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.RuntimeException: java.lang.AssertionError:
assertion failed: copyAndReset must return a zero value copy
        at 
org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
        at 
com.impressico.apache.beam.word.count.vs.WordCountVS.main(WordCountVS.java:177)
        ... 6 more
Caused by: java.lang.AssertionError: assertion failed: copyAndReset
must return a zero value copy
        at scala.Predef$.assert(Predef.scala:170)
        at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:163)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
        at 
org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:155)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:45)
        at 
org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:348)
        at 
org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:328)
        at 
org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:374)
        at 
org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:360)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:231)
        at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
        at org.apache.beam.runners.spark.SparkRunner$2.run(SparkRunner.java:226)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I have been trying to explore this exception, there's not much help for the
same, sharing my sample code.

public class WordCount implements JavaStreamingContextFactory{

private static final Logger logger =
LoggerFactory.getLogger(WordCount.class);


public static String path = "/home/username/kinglear.txt";

public static String username = "username";

public static String sparkMaster = "spark://192.168.1.1:7077";

public static String appName = "SparkRunnerFileRead";

public static String pathToWrite = "/home/username/beamWorks/output.txt";

public static LongAccumulator longAccumulator ;

static class ExtractWordsFn extends DoFn<String, String> {

private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", Sum.ofLongs());


@ProcessElement

public void processElement(ProcessContext c) {

if (c.element().trim().isEmpty()) {

emptyLines.addValue(1L);

}


// Split the line into words.

String[] words = c.element().split("[^a-zA-Z']+");


// Output each word encountered into the output PCollection.

for (String word : words) {

if (!word.isEmpty()) {

c.output(word);

}

}

}

}


public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>,
String> {

@Override

public String apply(KV<String, Long> input) {

logger.info("return input.getKey() + :  + input.getValue()");

return input.toString();

}

}


public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {

@Override

public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

// Convert lines of text into individual words.

PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));


// Count the number of times each word occurs.

PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());

longAccumulator.add(wordCounts.expand().size());

return wordCounts;

}

}


public interface WordCountOptions extends SparkContextOptions {

@Description("Path of the file to read from")

@Default.String("/home/username/kinglear.txt")

String getInputFile();


void setInputFile(String value);


@Description("Path of the file to write to")

@Default.String("/home/username/beamWorks/output.txt")

String getOutput();


void setOutput(String value);

}


public static void main(String[] args) {

WordCount wc = new WordCount();


System.setProperty("username", username);


WordCountOptions sparkContextOptions =
PipelineOptionsFactory.as(WordCountOptions.class);

sparkContextOptions.setSparkMaster(sparkMaster);

sparkContextOptions.setAppName("BeamReadOperations");

sparkContextOptions.setStreaming(false);

sparkContextOptions.setRunner(SparkRunner.class);

sparkContextOptions.setEnableSparkMetricSinks(true);

sparkContextOptions.setJobName("BeamReadOperations Job");

sparkContextOptions.as(SparkPipelineOptions.class);

sparkContextOptions.setCheckpointDurationMillis(1000L);

sparkContextOptions.setMaxRecordsPerBatch(1000L);

sparkContextOptions.setOptionsId(1000L);


sparkContextOptions.setProvidedSparkContext(wc.create().sparkContext());

sparkContextOptions.setUsesProvidedSparkContext(true);

Pipeline p = Pipeline.create(sparkContextOptions);

p.apply("ReadLines", TextIO.Read.from(sparkContextOptions.getInputFile()))

.apply(new CountWords())

.apply(MapElements.via(new FormatAsTextFn()))

.apply("WriteCounts", TextIO.Write.to(sparkContextOptions.getOutput()));

p.run().waitUntilFinish();

}


public JavaStreamingContext create() {

return createJavaStreamingContext();

}


private static JavaSparkContext createJavaSparkContext() {

SparkConf sparkConf = new SparkConf();

sparkConf.setAppName(appName);

sparkConf.setMaster(sparkMaster);

sparkConf.set("spark.serializer",
"org.apache.spark.serializer.JavaSerializer");


JavaSerializer javaSerializer = new JavaSerializer(sparkConf);

javaSerializer.newInstance();

javaSerializer.org$apache$spark$serializer$JavaSerializer$$counterReset();


SparkContext sparkContext = new SparkContext(sparkMaster, appName,
sparkConf);


longAccumulator = sparkContext.longAccumulator();

longAccumulator.setValue(100L);

JavaSparkContext javaSparkContext =
JavaSparkContext.fromSparkContext(sparkContext);

return javaSparkContext;

}


private JavaStreamingContext createJavaStreamingContext() {

JavaStreamingContext javaStreamingContext = new
JavaStreamingContext(createJavaSparkContext(),

new Duration(10000L));

return javaStreamingContext;

}

}



just for info : i am using these dependencies

<dependency>

<groupId>org.apache.beam</groupId>

<artifactId>beam-runners-spark</artifactId>

<version>0.6.0</version>

</dependency>

<!-- <dependency> <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hdfs</artifactId>

<version>0.6.0</version> </dependency> -->

<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-core
-->

<dependency>

<groupId>org.apache.beam</groupId>

<artifactId>beam-sdks-java-core</artifactId>

<version>0.6.0</version>

</dependency>

<!--
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.10</artifactId>

<version>1.6.3</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.10</artifactId>

<version>1.6.3</version>

</dependency>


and the spark-submit command I use :


spark-submit --class com.impressico.apache.beam.word.count.vs.WordCount
--master spark://192.168.1.1:6066 --deploy-mode cluster
/home/username/beamWorks/WordCount.jar --runner=SparkRunner --name WordCount


any inputs will be very helpful for me.


Thanks,

Vaibhav Sharma
package com.impressico.apache.beam.word.count.vs;

import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.JavaSerializer;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WordCount implements JavaStreamingContextFactory{
	
	private static final Logger logger = LoggerFactory.getLogger(WordCount.class);

	public static String path = "/home/username/kinglear.txt";
	public static String username = "username";
	public static String sparkMaster = "spark://192.168.1.1:7077";
	public static String appName = "SparkRunnerHdfsFileRead";
	public static String pathToWrite = "/home/username/beamWorks/output.txt";
	public static LongAccumulator longAccumulator ;
	
	static class ExtractWordsFn extends DoFn<String, String> {
		private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", Sum.ofLongs());

		@ProcessElement
		public void processElement(ProcessContext c) {
			if (c.element().trim().isEmpty()) {
				emptyLines.addValue(1L);
			}

			// Split the line into words.
			String[] words = c.element().split("[^a-zA-Z']+");

			// Output each word encountered into the output PCollection.
			for (String word : words) {
				if (!word.isEmpty()) {
					c.output(word);
				}
			}
		}
	}

	public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
		@Override
		public String apply(KV<String, Long> input) {
			logger.info("return input.getKey() + :  + input.getValue()");
			return input.toString();
		}
	}

	public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
		@Override
		public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
			// Convert lines of text into individual words.
			PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

			// Count the number of times each word occurs.
			PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement());
			longAccumulator.add(wordCounts.expand().size());
			return wordCounts;
		}
	}

	public interface WordCountOptions extends SparkContextOptions {
		@Description("Path of the file to read from")
		@Default.String("/home/username/kinglear.txt")
		String getInputFile();

		void setInputFile(String value);

		@Description("Path of the file to write to")
		@Default.String("/home/username/beamWorks/output.txt")
		String getOutput();

		void setOutput(String value);
	}

	public static void main(String[] args) {
		WordCount wc = new WordCount();

		System.setProperty("username", username);

		WordCountOptions sparkContextOptions = PipelineOptionsFactory.as(WordCountOptions.class);
		
		sparkContextOptions.setSparkMaster(sparkMaster);
		sparkContextOptions.setAppName("BeamReadOperations");
		sparkContextOptions.setStreaming(false);
		sparkContextOptions.setRunner(SparkRunner.class);
		sparkContextOptions.setEnableSparkMetricSinks(true);
		sparkContextOptions.setJobName("BeamReadOperations Job");
		sparkContextOptions.as(SparkPipelineOptions.class);
		sparkContextOptions.setCheckpointDurationMillis(1000L);
		sparkContextOptions.setMaxRecordsPerBatch(1000L);
		sparkContextOptions.setOptionsId(1000L);

		sparkContextOptions.setProvidedSparkContext(wc.create().sparkContext());
		sparkContextOptions.setUsesProvidedSparkContext(true);
		
		Pipeline p = Pipeline.create(sparkContextOptions);
		
		p.apply("ReadLines", TextIO.Read.from(sparkContextOptions.getInputFile()))
				.apply(new CountWords())
				.apply(MapElements.via(new FormatAsTextFn()))
				.apply("WriteCounts", TextIO.Write.to(sparkContextOptions.getOutput()));
		p.run().waitUntilFinish();	
	}

	public JavaStreamingContext create() {
		return createJavaStreamingContext();
	}

	private static JavaSparkContext createJavaSparkContext() {
		SparkConf sparkConf = new SparkConf();
		sparkConf.setAppName(appName);		
		sparkConf.setMaster(sparkMaster);
		sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");

		JavaSerializer javaSerializer = new JavaSerializer(sparkConf);
		javaSerializer.newInstance();
		javaSerializer.org$apache$spark$serializer$JavaSerializer$$counterReset();

		SparkContext sparkContext = new SparkContext(sparkMaster, appName, sparkConf);

		longAccumulator = sparkContext.longAccumulator();
		longAccumulator.setValue(100L);
		JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkContext);
		return javaSparkContext;
	}

	private JavaStreamingContext createJavaStreamingContext() {		
		JavaStreamingContext javaStreamingContext = new JavaStreamingContext(createJavaSparkContext(),
				new Duration(10000L));		
		return javaStreamingContext;
	}
}

Reply via email to