Hi, Can you please share which version of Beam you are using?
On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <[email protected]> wrote: > hi, here is my program that about additional outputs for Apache Beam and > the result : > public class DataExtraction2 { > public static void main(String[] args) { > System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1"); > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setSparkMaster("local[4]"); > // options.setCheckpointDir("./checkpoint"); > options.setRunner(SparkRunner.class); > // options.setRunner(DirectRunner.class); > options.setStorageLevel("MEMORY_ONLY"); > options.setAppName("testMavenDependency"); > options.setBatchIntervalMillis(1000L); > options.setEnableSparkMetricSinks(false); > Pipeline p = Pipeline.create(options); > List<String> topics = Arrays.asList("beamOnSparkTest".split(",")); > > final TupleTag<String> rawDataTag = new TupleTag<String>() { > }; > > final TupleTag<String> exceptionTag = new TupleTag<String>() { > }; > final TupleTag<String> riskEventLogTag = new TupleTag<String>() { > }; > final TupleTag<String> statisticsTag = new TupleTag<String>() { > }; > final TupleTag<String> errorTargetLogTag = new TupleTag<String>() { > }; > final TupleTag<String> equipmentLogTag = new TupleTag<String>() { > }; > final TupleTag<String> performanceLogTag = new TupleTag<String>() { > }; > PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read() > .withBootstrapServers("172.17.1.138:9092,172.17.1.137:9092 > ") > .withTopics(topics) > .withConsumerFactoryFn(new CafintechConsumerFactoryFn()) > .withKeyCoder(VoidCoder.of()) > .withValueCoder(StringUtf8Coder.of()) > .withoutMetadata() > ).apply(Values.<String>create()); > PCollectionTuple results = rawData.apply( > ParDo.withOutputTags(rawDataTag, > TupleTagList.of(exceptionTag) > .and(riskEventLogTag) > .and(statisticsTag) > .and(errorTargetLogTag) > .and(equipmentLogTag) > .and(performanceLogTag)) > .of(new DoFn<String, String>() { > @ProcessElement > public void processElement(ProcessContext c) { > String idCoop = ""; > int eventType = 0; > int osPlatformType = -1; > String innerDecision = ""; > String outterDecision = ""; > // Date appTime = new Date(); > String eventId = ""; > //String strategyList = ""; > String uuid = ""; > String phoneNo = ""; > int equipmentType = -1; > int antiFraudTime = -1; > ...... > } > })); > p.run().waitUntilFinish(); > } > } > when i run this program, i get result: > ..... > .... > 2017-04-26 15:06:13,077 [pool-1-thread-1] > [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the > context, marking it as stopped > java.io.NotSerializableException: DStream checkpointing has been enabled > but the DStreams with their functions are not serializable > org.apache.beam.runners.spark.translation.EvaluationContext > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.translation.EvaluationContext, value: > org.apache.beam.runners.spark.translation.EvaluationContext@2807813e) > - field (class: > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, > name: val$context, type: class > org.apache.beam.runners.spark.translation.EvaluationContext) > - object (class > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8 > ) > - field (class: > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > name: transformFunc$3, type: interface > org.apache.spark.api.java.function.Function) > - object (class > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > <function1>) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > name: cleanedF$2, type: interface scala.Function1) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > <function2>) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > name: cleanedF$3, type: interface scala.Function2) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > <function2>) > - field (class: org.apache.spark.streaming.dstream.TransformedDStream, > name: transformFunc, type: interface scala.Function2) > - object (class org.apache.spark.streaming.dstream.TransformedDStream, > org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > - writeObject data (class: org.apache.spark.streaming.dstream.DStream) > - object (class > org.apache.spark.streaming.dstream.InternalMapWithStateDStream, > org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > - writeObject data (class: org.apache.spark.streaming.dstream.DStream) > - object (class org.apache.spark.streaming.dstream.FilteredDStream, > org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > - writeObject data (class: org.apache.spark.streaming.dstream.DStream) > - object (class > org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, > org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ... > .... > > if only one main output, program works OK > can you tell me why? >
