Yes. Spark streaming support is still experimental and this issue exists in Beam 0.6.0
This has since been fixed and the fix will be a part of the upcoming release. Since this isn't the first time a user has encountered this I've created a JIRA ticket for better visibility for this issue: https://issues.apache.org/jira/browse/BEAM-2106 Thanks for reaching out! Please feel fry to try out your pipeline using Beam master branch or one of the nightly SNAPSHOT builds. On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com> wrote: > Here is my maven configuration, thank you. > > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-core</artifactId> > <version>0.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-runners-direct-java</artifactId> > <version>0.6.0</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-io-kafka</artifactId> > <version>0.6.0</version> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-runners-spark</artifactId> > <version>0.6.0</version> > </dependency> > > > On 26 Apr 2017, at 6:58 PM, Aviem Zur <aviem...@gmail.com> wrote: > > Hi, > > Can you please share which version of Beam you are using? > > On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com> wrote: > >> hi, here is my program that about additional outputs for Apache Beam and >> the result : >> public class DataExtraction2 { >> public static void main(String[] args) { >> System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1"); >> SparkPipelineOptions options = >> PipelineOptionsFactory.as(SparkPipelineOptions.class); >> options.setSparkMaster("local[4]"); >> // options.setCheckpointDir("./checkpoint"); >> options.setRunner(SparkRunner.class); >> // options.setRunner(DirectRunner.class); >> options.setStorageLevel("MEMORY_ONLY"); >> options.setAppName("testMavenDependency"); >> options.setBatchIntervalMillis(1000L); >> options.setEnableSparkMetricSinks(false); >> Pipeline p = Pipeline.create(options); >> List<String> topics = Arrays.asList("beamOnSparkTest".split(",")); >> >> final TupleTag<String> rawDataTag = new TupleTag<String>() { >> }; >> >> final TupleTag<String> exceptionTag = new TupleTag<String>() { >> }; >> final TupleTag<String> riskEventLogTag = new TupleTag<String>() { >> }; >> final TupleTag<String> statisticsTag = new TupleTag<String>() { >> }; >> final TupleTag<String> errorTargetLogTag = new TupleTag<String>() >> { >> }; >> final TupleTag<String> equipmentLogTag = new TupleTag<String>() { >> }; >> final TupleTag<String> performanceLogTag = new TupleTag<String>() >> { >> }; >> PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read() >> .withBootstrapServers("172.17.1.138:9092, >> 172.17.1.137:9092") >> .withTopics(topics) >> .withConsumerFactoryFn(new CafintechConsumerFactoryFn()) >> .withKeyCoder(VoidCoder.of()) >> .withValueCoder(StringUtf8Coder.of()) >> .withoutMetadata() >> ).apply(Values.<String>create()); >> PCollectionTuple results = rawData.apply( >> ParDo.withOutputTags(rawDataTag, >> TupleTagList.of(exceptionTag) >> .and(riskEventLogTag) >> .and(statisticsTag) >> .and(errorTargetLogTag) >> .and(equipmentLogTag) >> .and(performanceLogTag)) >> .of(new DoFn<String, String>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> String idCoop = ""; >> int eventType = 0; >> int osPlatformType = -1; >> String innerDecision = ""; >> String outterDecision = ""; >> // Date appTime = new Date(); >> String eventId = ""; >> //String strategyList = ""; >> String uuid = ""; >> String phoneNo = ""; >> int equipmentType = -1; >> int antiFraudTime = -1; >> ...... >> } >> })); >> p.run().waitUntilFinish(); >> } >> } >> when i run this program, i get result: >> ..... >> .... >> 2017-04-26 15:06:13,077 [pool-1-thread-1] >> [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the >> context, marking it as stopped >> java.io.NotSerializableException: DStream checkpointing has been enabled >> but the DStreams with their functions are not serializable >> org.apache.beam.runners.spark.translation.EvaluationContext >> Serialization stack: >> - object not serializable (class: >> org.apache.beam.runners.spark.translation.EvaluationContext, value: >> org.apache.beam.runners.spark.translation.EvaluationContext@2807813e) >> - field (class: >> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, >> name: val$context, type: class >> org.apache.beam.runners.spark.translation.EvaluationContext) >> - object (class >> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, >> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8 >> ) >> - field (class: >> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, >> name: transformFunc$3, type: interface >> org.apache.spark.api.java.function.Function) >> - object (class >> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, >> <function1>) >> - field (class: >> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, >> name: cleanedF$2, type: interface scala.Function1) >> - object (class >> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, >> <function2>) >> - field (class: >> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, >> name: cleanedF$3, type: interface scala.Function2) >> - object (class >> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, >> <function2>) >> - field (class: org.apache.spark.streaming.dstream.TransformedDStream, >> name: transformFunc, type: interface scala.Function2) >> - object (class org.apache.spark.streaming.dstream.TransformedDStream, >> org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5) >> - writeObject data (class: >> org.apache.spark.streaming.dstream.DStreamCheckpointData) >> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, >> [ >> 0 checkpoint files >> ]) >> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >> - object (class >> org.apache.spark.streaming.dstream.InternalMapWithStateDStream, >> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d) >> - writeObject data (class: >> org.apache.spark.streaming.dstream.DStreamCheckpointData) >> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, >> [ >> 0 checkpoint files >> ]) >> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >> - object (class org.apache.spark.streaming.dstream.FilteredDStream, >> org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240) >> - writeObject data (class: >> org.apache.spark.streaming.dstream.DStreamCheckpointData) >> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, >> [ >> 0 checkpoint files >> ]) >> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >> - object (class >> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, >> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca) >> - writeObject data (class: >> org.apache.spark.streaming.dstream.DStreamCheckpointData) >> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, >> [ >> 0 checkpoint files >> ... >> .... >> >> if only one main output, program works OK >> can you tell me why? >> > >