By "cannot run normally" do you mean you get an exception? We recently had a bug on master in which streaming pipelines containing `ParDo` with multiple outputs ran into `NullPointerException`. This was fixed here: https://issues.apache.org/jira/browse/BEAM-2029 Is this what you're facing? If so does pulling master and rebuilding help?
On Thu, May 4, 2017 at 5:37 AM zhenglin.Tian <zhenglin.t...@cafintech.com> wrote: > hi, i have a trouble about addition outputs with SparkRunner. > Here if my code, when i use DirectRunner, everything runs OK, but if i > replace DirectRunner with SparkRunner, the code can't run normally. > > public class UnifiedDataExtraction { > > private static TupleTag<String> rawDataTag = new TupleTag<String>() { > }; > > private static TupleTag<String> exceptionTag = new TupleTag<String>() { > }; > > public static void main(String[] args) { > System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME); > > SparkPipelineOptions options = > PipelineOptionsFactory.create().as(SparkPipelineOptions.class); > options.setSparkMaster(ConstantsOwn.SPARK_MASTER); > options.setRunner(SparkRunner.class); > // options.setRunner(DirectRunner.class); > options.setStorageLevel("MEMORY_ONLY"); > options.setAppName(ConstantsOwn.SPARK_APPNAME); > options.setBatchIntervalMillis(1000L); > options.setEnableSparkMetricSinks(false); > Pipeline p = Pipeline.create(options); > > > List<String> topics = > Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(",")); > > PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read() > .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS) > .withTopics(topics) > //.withConsumerFactoryFn(new CafintechConsumerFactoryFn()) > .withKeyCoder(VoidCoder.of()) > .withValueCoder(StringUtf8Coder.of()) > .withKeyDeserializer(VoidDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ).apply(Values.<String>create()); > > rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print > each elment of rawData. Able to run normally ① > PCollectionTuple results = rawData.apply("logAnatomyTest", > // ② > ParDo.of( > new DoFn<String, String>() { > @ProcessElement > public void process(ProcessContext c) { > String element = c.element(); > System.out.println("===="+element); > if (!element.equals("EOF")) { > c.output(c.element()); > } > } > } > ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag)) > ); > p.run().waitUntilFinish(); > } > } > > in the privious code, the code that be commented with ① can be able to run > normally,but ②,i can't get anything. > > here is my beam version > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-core</artifactId> > <version>0.7.0-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-runners-direct-java</artifactId> > <version>0.7.0-SNAPSHOT</version> > <scope>runtime</scope> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-io-kafka</artifactId> > <version>0.7.0-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-runners-spark</artifactId> > <version>0.7.0-SNAPSHOT</version> > </dependency> > > > someone please help me. > > > > Sent from Mailbird > <http://www.getmailbird.com/?utm_source=Mailbird&utm_medium=email&utm_campaign=sent-from-mailbird> > > On 2017/4/28 4:43:23, Aviem Zur <aviem...@gmail.com> wrote: > Yes. Spark streaming support is still experimental and this issue exists > in Beam 0.6.0 > > This has since been fixed and the fix will be a part of the upcoming > release. > > Since this isn't the first time a user has encountered this I've created a > JIRA ticket for better visibility for this issue: > https://issues.apache.org/jira/browse/BEAM-2106 > > Thanks for reaching out! Please feel fry to try out your pipeline using > Beam master branch or one of the nightly SNAPSHOT builds. > > On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com> wrote: > >> Here is my maven configuration, thank you. >> >> <dependency> >> <groupId>org.apache.beam</groupId> >> <artifactId>beam-sdks-java-core</artifactId> >> <version>0.6.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.beam</groupId> >> <artifactId>beam-runners-direct-java</artifactId> >> <version>0.6.0</version> >> <scope>runtime</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.beam</groupId> >> <artifactId>beam-sdks-java-io-kafka</artifactId> >> <version>0.6.0</version> >> </dependency> >> <dependency> >> <groupId>org.apache.beam</groupId> >> <artifactId>beam-runners-spark</artifactId> >> <version>0.6.0</version> >> </dependency> >> >> >> On 26 Apr 2017, at 6:58 PM, Aviem Zur <aviem...@gmail.com> wrote: >> >> Hi, >> >> Can you please share which version of Beam you are using? >> >> On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com> wrote: >> >>> hi, here is my program that about additional outputs for Apache Beam >>> and the result : >>> public class DataExtraction2 { >>> public static void main(String[] args) { >>> System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1 >>> "); >>> SparkPipelineOptions options = >>> PipelineOptionsFactory.as(SparkPipelineOptions.class); >>> options.setSparkMaster("local[4]"); >>> // options.setCheckpointDir("./checkpoint"); >>> options.setRunner(SparkRunner.class); >>> // options.setRunner(DirectRunner.class); >>> options.setStorageLevel("MEMORY_ONLY"); >>> options.setAppName("testMavenDependency"); >>> options.setBatchIntervalMillis(1000L); >>> options.setEnableSparkMetricSinks(false); >>> Pipeline p = Pipeline.create(options); >>> List<String> topics = >>> Arrays.asList("beamOnSparkTest".split(",")); >>> >>> final TupleTag<String> rawDataTag = new TupleTag<String>() { >>> }; >>> >>> final TupleTag<String> exceptionTag = new TupleTag<String>() { >>> }; >>> final TupleTag<String> riskEventLogTag = new TupleTag<String>() { >>> }; >>> final TupleTag<String> statisticsTag = new TupleTag<String>() { >>> }; >>> final TupleTag<String> errorTargetLogTag = new >>> TupleTag<String>() { >>> }; >>> final TupleTag<String> equipmentLogTag = new TupleTag<String>() { >>> }; >>> final TupleTag<String> performanceLogTag = new >>> TupleTag<String>() { >>> }; >>> PCollection<String> rawData = p.apply(KafkaIO.<Void, >>> String>read() >>> .withBootstrapServers("172.17.1.138:9092, >>> 172.17.1.137:9092") >>> .withTopics(topics) >>> .withConsumerFactoryFn(new CafintechConsumerFactoryFn()) >>> .withKeyCoder(VoidCoder.of()) >>> .withValueCoder(StringUtf8Coder.of()) >>> .withoutMetadata() >>> ).apply(Values.<String>create()); >>> PCollectionTuple results = rawData.apply( >>> ParDo.withOutputTags(rawDataTag, >>> TupleTagList.of(exceptionTag) >>> .and(riskEventLogTag) >>> .and(statisticsTag) >>> .and(errorTargetLogTag) >>> .and(equipmentLogTag) >>> .and(performanceLogTag)) >>> .of(new DoFn<String, String>() { >>> @ProcessElement >>> public void processElement(ProcessContext c) >>> { >>> String idCoop = ""; >>> int eventType = 0; >>> int osPlatformType = -1; >>> String innerDecision = ""; >>> String outterDecision = ""; >>> // Date appTime = new Date(); >>> String eventId = ""; >>> //String strategyList = ""; >>> String uuid = ""; >>> String phoneNo = ""; >>> int equipmentType = -1; >>> int antiFraudTime = -1; >>> ...... >>> } >>> })); >>> p.run().waitUntilFinish(); >>> } >>> } >>> when i run this program, i get result: >>> ..... >>> .... >>> 2017-04-26 15:06:13,077 [pool-1-thread-1] >>> [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the >>> context, marking it as stopped >>> java.io.NotSerializableException: DStream checkpointing has been enabled >>> but the DStreams with their functions are not serializable >>> org.apache.beam.runners.spark.translation.EvaluationContext >>> Serialization stack: >>> - object not serializable (class: >>> org.apache.beam.runners.spark.translation.EvaluationContext, value: >>> org.apache.beam.runners.spark.translation.EvaluationContext@2807813e) >>> - field (class: >>> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, >>> name: val$context, type: class >>> org.apache.beam.runners.spark.translation.EvaluationContext) >>> - object (class >>> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, >>> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8 >>> ) >>> - field (class: >>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, >>> name: transformFunc$3, type: interface >>> org.apache.spark.api.java.function.Function) >>> - object (class >>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, >>> <function1>) >>> - field (class: >>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, >>> name: cleanedF$2, type: interface scala.Function1) >>> - object (class >>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, >>> <function2>) >>> - field (class: >>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, >>> name: cleanedF$3, type: interface scala.Function2) >>> - object (class >>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, >>> <function2>) >>> - field (class: org.apache.spark.streaming.dstream.TransformedDStream, >>> name: transformFunc, type: interface scala.Function2) >>> - object (class org.apache.spark.streaming.dstream.TransformedDStream, >>> org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5) >>> - writeObject data (class: >>> org.apache.spark.streaming.dstream.DStreamCheckpointData) >>> - object (class >>> org.apache.spark.streaming.dstream.DStreamCheckpointData, [ >>> 0 checkpoint files >>> ]) >>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >>> - object (class >>> org.apache.spark.streaming.dstream.InternalMapWithStateDStream, >>> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d) >>> - writeObject data (class: >>> org.apache.spark.streaming.dstream.DStreamCheckpointData) >>> - object (class >>> org.apache.spark.streaming.dstream.DStreamCheckpointData, [ >>> 0 checkpoint files >>> ]) >>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >>> - object (class org.apache.spark.streaming.dstream.FilteredDStream, >>> org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240) >>> - writeObject data (class: >>> org.apache.spark.streaming.dstream.DStreamCheckpointData) >>> - object (class >>> org.apache.spark.streaming.dstream.DStreamCheckpointData, [ >>> 0 checkpoint files >>> ]) >>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >>> - object (class >>> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, >>> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca) >>> - writeObject data (class: >>> org.apache.spark.streaming.dstream.DStreamCheckpointData) >>> - object (class >>> org.apache.spark.streaming.dstream.DStreamCheckpointData, [ >>> 0 checkpoint files >>> ... >>> .... >>> >>> if only one main output, program works OK >>> can you tell me why? >>> >> >>