Here is my maven configuration, thank you.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>0.6.0</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>0.6.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>0.6.0</version>
</dependency>
<dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-spark</artifactId>
        <version>0.6.0</version>
</dependency>


> On 26 Apr 2017, at 6:58 PM, Aviem Zur <[email protected]> wrote:
> 
> Hi,
> 
> Can you please share which version of Beam you are using?
> 
> On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <[email protected] 
> <mailto:[email protected]>> wrote:
> hi, here is my program that about additional outputs for Apache Beam  and  
> the result :                                        
> public class DataExtraction2 {
>     public static void main(String[] args) {
>         System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1 <>");
>         SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
>         options.setSparkMaster("local[4]");
> //        options.setCheckpointDir("./checkpoint");
>         options.setRunner(SparkRunner.class);
> //        options.setRunner(DirectRunner.class);
>         options.setStorageLevel("MEMORY_ONLY");
>         options.setAppName("testMavenDependency");
>         options.setBatchIntervalMillis(1000L);
>         options.setEnableSparkMetricSinks(false);
>         Pipeline p = Pipeline.create(options);
>         List<String> topics = Arrays.asList("beamOnSparkTest".split(","));
>     
>         final TupleTag<String> rawDataTag = new TupleTag<String>() {
>         };
>      
>         final TupleTag<String> exceptionTag = new TupleTag<String>() {
>         };
>         final TupleTag<String> riskEventLogTag = new TupleTag<String>() {
>         };
>         final TupleTag<String> statisticsTag = new TupleTag<String>() {
>         };
>         final TupleTag<String> errorTargetLogTag = new TupleTag<String>() {
>         };
>         final TupleTag<String> equipmentLogTag = new TupleTag<String>() {
>         };
>         final TupleTag<String> performanceLogTag = new TupleTag<String>() {
>         };
>         PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
>                 .withBootstrapServers("172.17.1.138:9092 
> <http://172.17.1.138:9092/>,172.17.1.137:9092 <http://172.17.1.137:9092/>")
>                 .withTopics(topics)
>                 .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
>                 .withKeyCoder(VoidCoder.of())
>                 .withValueCoder(StringUtf8Coder.of())
>                 .withoutMetadata()
>         ).apply(Values.<String>create());
>         PCollectionTuple results = rawData.apply(
>                 ParDo.withOutputTags(rawDataTag,
>                         TupleTagList.of(exceptionTag)
>                                 .and(riskEventLogTag)
>                                 .and(statisticsTag)
>                                 .and(errorTargetLogTag)
>                                 .and(equipmentLogTag)
>                                 .and(performanceLogTag))
>                         .of(new DoFn<String, String>() {
>                             @ProcessElement
>                             public void processElement(ProcessContext c) {
>                                 String idCoop = "";
>                                 int eventType = 0;
>                                 int osPlatformType = -1;
>                                 String innerDecision = "";
>                                 String outterDecision = "";
>                                 // Date appTime = new Date();
>                                 String eventId = "";
>                                 //String strategyList = "";
>                                 String uuid = "";
>                                 String phoneNo = "";
>                                 int equipmentType = -1;
>                                 int antiFraudTime = -1;
>                                 ......
>                             }
>                         }));
>         p.run().waitUntilFinish();
>     }
> }
> when i run this program, i get result:
> .....
> ....
> 2017-04-26 15:06:13,077 [pool-1-thread-1] 
> [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the 
> context, marking it as stopped
> java.io.NotSerializableException: DStream checkpointing has been enabled but 
> the DStreams with their functions are not serializable
> org.apache.beam.runners.spark.translation.EvaluationContext
> Serialization stack:
> - object not serializable (class: 
> org.apache.beam.runners.spark.translation.EvaluationContext, value: 
> org.apache.beam.runners.spark.translation.EvaluationContext@2807813e)
> - field (class: 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
>  name: val$context, type: class 
> org.apache.beam.runners.spark.translation.EvaluationContext)
> - object (class 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
>  
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8)
> - field (class: 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  name: transformFunc$3, type: interface 
> org.apache.spark.api.java.function.Function)
> - object (class 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>  <function1>)
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  name: cleanedF$2, type: interface scala.Function1)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>  <function2>)
> - field (class: 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> name: cleanedF$3, type: interface scala.Function2)
> - object (class 
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
> <function2>)
> - field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: 
> transformFunc, type: interface scala.Function2)
> - object (class org.apache.spark.streaming.dstream.TransformedDStream, 
> org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5)
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class 
> org.apache.spark.streaming.dstream.InternalMapWithStateDStream, 
> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d)
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.dstream.FilteredDStream, 
> org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240)
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, 
> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca)
> - writeObject data (class: 
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
> ...
> ....
> 
> if only one main output, program works OK
> can you tell me why?

Reply via email to