hi, here is my program that about additional outputs for Apache Beam  and  the 
result :                                        
public class DataExtraction2 {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1 
<c://hadoop/hadoop-2.6.1>");
        SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
        options.setSparkMaster("local[4]");
//        options.setCheckpointDir("./checkpoint");
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName("testMavenDependency");
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);
        List<String> topics = Arrays.asList("beamOnSparkTest".split(","));
    
        final TupleTag<String> rawDataTag = new TupleTag<String>() {
        };
     
        final TupleTag<String> exceptionTag = new TupleTag<String>() {
        };
        final TupleTag<String> riskEventLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> statisticsTag = new TupleTag<String>() {
        };
        final TupleTag<String> errorTargetLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> equipmentLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> performanceLogTag = new TupleTag<String>() {
        };
        PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
                .withBootstrapServers("172.17.1.138:9092,172.17.1.137:9092")
                .withTopics(topics)
                .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withoutMetadata()
        ).apply(Values.<String>create());
        PCollectionTuple results = rawData.apply(
                ParDo.withOutputTags(rawDataTag,
                        TupleTagList.of(exceptionTag)
                                .and(riskEventLogTag)
                                .and(statisticsTag)
                                .and(errorTargetLogTag)
                                .and(equipmentLogTag)
                                .and(performanceLogTag))
                        .of(new DoFn<String, String>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {
                                String idCoop = "";
                                int eventType = 0;
                                int osPlatformType = -1;
                                String innerDecision = "";
                                String outterDecision = "";
                                // Date appTime = new Date();
                                String eventId = "";
                                //String strategyList = "";
                                String uuid = "";
                                String phoneNo = "";
                                int equipmentType = -1;
                                int antiFraudTime = -1;
                                ......
                            }
                        }));
        p.run().waitUntilFinish();
    }
}
when i run this program, i get result:
.....
....
2017-04-26 15:06:13,077 [pool-1-thread-1] 
[org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the 
context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but 
the DStreams with their functions are not serializable
org.apache.beam.runners.spark.translation.EvaluationContext
Serialization stack:
- object not serializable (class: 
org.apache.beam.runners.spark.translation.EvaluationContext, value: 
org.apache.beam.runners.spark.translation.EvaluationContext@2807813e)
- field (class: 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
 name: val$context, type: class 
org.apache.beam.runners.spark.translation.EvaluationContext)
- object (class 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8)
- field (class: 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, 
name: transformFunc$3, type: interface 
org.apache.spark.api.java.function.Function)
- object (class 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, 
<function1>)
- field (class: 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
 name: cleanedF$2, type: interface scala.Function1)
- object (class 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
 <function2>)
- field (class: 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
name: cleanedF$3, type: interface scala.Function2)
- object (class 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
<function2>)
- field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: 
transformFunc, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.TransformedDStream, 
org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5)
- writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.InternalMapWithStateDStream, 
org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d)
- writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.FilteredDStream, 
org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240)
- writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, 
org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca)
- writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
...
....

if only one main output, program works OK
can you tell me why?

Reply via email to