hi, i have a trouble about addition outputs with SparkRunner. Here if my code, when i use DirectRunner, everything runs OK, but if i replace DirectRunner with SparkRunner, the code can't run normally.
public class UnifiedDataExtraction { private static TupleTag<String> rawDataTag = new TupleTag<String>() { }; private static TupleTag<String> exceptionTag = new TupleTag<String>() { }; public static void main(String[] args) { System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME); SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); options.setSparkMaster(ConstantsOwn.SPARK_MASTER); options.setRunner(SparkRunner.class); // options.setRunner(DirectRunner.class); options.setStorageLevel("MEMORY_ONLY"); options.setAppName(ConstantsOwn.SPARK_APPNAME); options.setBatchIntervalMillis(1000L); options.setEnableSparkMetricSinks(false); Pipeline p = Pipeline.create(options); List<String> topics = Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(",")); PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read() .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS) .withTopics(topics) //.withConsumerFactoryFn(new CafintechConsumerFactoryFn()) .withKeyCoder(VoidCoder.of()) .withValueCoder(StringUtf8Coder.of()) .withKeyDeserializer(VoidDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withoutMetadata() ).apply(Values.<String>create()); rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print each elment of rawData. Able to run normally ① PCollectionTuple results = rawData.apply("logAnatomyTest", // ② ParDo.of( new DoFn<String, String>() { @ProcessElement public void process(ProcessContext c) { String element = c.element(); System.out.println("===="+element); if (!element.equals("EOF")) { c.output(c.element()); } } } ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag)) ); p.run().waitUntilFinish(); } } in the privious code, the code that be commented with ① can be able to run normally,but ②,i can't get anything. here is my beam version <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>0.7.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>0.7.0-SNAPSHOT</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-kafka</artifactId> <version>0.7.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark</artifactId> <version>0.7.0-SNAPSHOT</version> </dependency> someone please help me. Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&utm_medium=email&utm_campaign=sent-from-mailbird] On 2017/4/28 4:43:23, Aviem Zur <aviem...@gmail.com> wrote: Yes. Spark streaming support is still experimental and this issue exists in Beam 0.6.0 This has since been fixed and the fix will be a part of the upcoming release. Since this isn't the first time a user has encountered this I've created a JIRA ticket for better visibility for this issue: https://issues.apache.org/jira/browse/BEAM-2106 [https://issues.apache.org/jira/browse/BEAM-2106] Thanks for reaching out! Please feel fry to try out your pipeline using Beam master branch or one of the nightly SNAPSHOT builds. On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com [mailto:4498...@qq.com]> wrote: Here is my maven configuration, thank you. <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>0.6.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>0.6.0</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-kafka</artifactId> <version>0.6.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark</artifactId> <version>0.6.0</version> </dependency> On 26 Apr 2017, at 6:58 PM, Aviem Zur <aviem...@gmail.com [mailto:aviem...@gmail.com]> wrote: Hi, Can you please share which version of Beam you are using? On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com [mailto:4498...@qq.com]> wrote: hi, here is my program that about additional outputs for Apache Beam and the result : public class DataExtraction2 { public static void main(String[] args) { System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1"); SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setSparkMaster("local[4]"); // options.setCheckpointDir("./checkpoint"); options.setRunner(SparkRunner.class); // options.setRunner(DirectRunner.class); options.setStorageLevel("MEMORY_ONLY"); options.setAppName("testMavenDependency"); options.setBatchIntervalMillis(1000L); options.setEnableSparkMetricSinks(false); Pipeline p = Pipeline.create(options); List<String> topics = Arrays.asList("beamOnSparkTest".split(",")); final TupleTag<String> rawDataTag = new TupleTag<String>() { }; final TupleTag<String> exceptionTag = new TupleTag<String>() { }; final TupleTag<String> riskEventLogTag = new TupleTag<String>() { }; final TupleTag<String> statisticsTag = new TupleTag<String>() { }; final TupleTag<String> errorTargetLogTag = new TupleTag<String>() { }; final TupleTag<String> equipmentLogTag = new TupleTag<String>() { }; final TupleTag<String> performanceLogTag = new TupleTag<String>() { }; PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read() .withBootstrapServers("172.17.1.138:9092 [http://172.17.1.138:9092/],172.17.1.137:9092 [http://172.17.1.137:9092/]") .withTopics(topics) .withConsumerFactoryFn(new CafintechConsumerFactoryFn()) .withKeyCoder(VoidCoder.of()) .withValueCoder(StringUtf8Coder.of()) .withoutMetadata() ).apply(Values.<String>create()); PCollectionTuple results = rawData.apply( ParDo.withOutputTags(rawDataTag, TupleTagList.of(exceptionTag) .and(riskEventLogTag) .and(statisticsTag) .and(errorTargetLogTag) .and(equipmentLogTag) .and(performanceLogTag)) .of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { String idCoop = ""; int eventType = 0; int osPlatformType = -1; String innerDecision = ""; String outterDecision = ""; // Date appTime = new Date(); String eventId = ""; //String strategyList = ""; String uuid = ""; String phoneNo = ""; int equipmentType = -1; int antiFraudTime = -1; ...... } })); p.run().waitUntilFinish(); } } when i run this program, i get result: ..... .... 2017-04-26 15:06:13,077 [pool-1-thread-1] [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the context, marking it as stopped java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable org.apache.beam.runners.spark.translation.EvaluationContext Serialization stack: - object not serializable (class: org.apache.beam.runners.spark.translation.EvaluationContext, value: org.apache.beam.runners.spark.translation.EvaluationContext@2807813e) - field (class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, name: val$context, type: class org.apache.beam.runners.spark.translation.EvaluationContext) - object (class org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8) - field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, name: transformFunc$3, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, <function1>) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>) - field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.TransformedDStream, org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.InternalMapWithStateDStream, org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.FilteredDStream, org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ... .... if only one main output, program works OK can you tell me why?