Re: A problem about additional outputs

2017-05-03 Thread Aviem Zur
By "cannot run normally" do you mean you get an exception? We recently had
a bug on master in which streaming pipelines containing `ParDo` with
multiple outputs ran into `NullPointerException`. This was fixed here:
https://issues.apache.org/jira/browse/BEAM-2029
Is this what you're facing? If so does pulling master and rebuilding help?

On Thu, May 4, 2017 at 5:37 AM zhenglin.Tian 
wrote:

> hi, i have a trouble about addition outputs with SparkRunner.
> Here if my code, when i use DirectRunner, everything runs OK, but if i
> replace DirectRunner with SparkRunner, the code can't run normally.
>
> public class UnifiedDataExtraction {
>
> private static TupleTag rawDataTag = new TupleTag() {
> };
>
> private static TupleTag exceptionTag = new TupleTag() {
> };
>
> public static void main(String[] args) {
> System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);
>
> SparkPipelineOptions options =
> PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
> options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
> options.setRunner(SparkRunner.class);
> //options.setRunner(DirectRunner.class);
> options.setStorageLevel("MEMORY_ONLY");
> options.setAppName(ConstantsOwn.SPARK_APPNAME);
> options.setBatchIntervalMillis(1000L);
> options.setEnableSparkMetricSinks(false);
> Pipeline p = Pipeline.create(options);
>
>
> List topics =
> Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));
>
> PCollection rawData = p.apply(KafkaIO.read()
> .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
> .withTopics(topics)
> //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
> .withKeyCoder(VoidCoder.of())
> .withValueCoder(StringUtf8Coder.of())
> .withKeyDeserializer(VoidDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> ).apply(Values.create());
>
> rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print
> each elment of rawData. Able to run normally   ①
> PCollectionTuple results = rawData.apply("logAnatomyTest",
>//   ②
> ParDo.of(
> new DoFn() {
> @ProcessElement
> public void process(ProcessContext c) {
> String element = c.element();
> System.out.println(""+element);
> if (!element.equals("EOF")) {
> c.output(c.element());
> }
> }
> }
> ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
> );
> p.run().waitUntilFinish();
>}
> }
>
> in the privious code, the code that be commented with ① can be able to run
> normally,but ②,i can't get anything.
>
> here is my beam version
> 
> org.apache.beam
> beam-sdks-java-core
> 0.7.0-SNAPSHOT
> 
> 
> org.apache.beam
> beam-runners-direct-java
> 0.7.0-SNAPSHOT
> runtime
> 
> 
> org.apache.beam
> beam-sdks-java-io-kafka
> 0.7.0-SNAPSHOT
> 
> 
>  org.apache.beam
>  beam-runners-spark
>  0.7.0-SNAPSHOT
> 
>
>
> someone please help me.
>
>
>
> Sent from Mailbird
> 
>
> On 2017/4/28 4:43:23, Aviem Zur  wrote:
> Yes. Spark streaming support is still experimental and this issue exists
> in Beam 0.6.0
>
> This has since been fixed and the fix will be a part of the upcoming
> release.
>
> Since this isn't the first time a user has encountered this I've created a
> JIRA ticket for better visibility for this issue:
> https://issues.apache.org/jira/browse/BEAM-2106
>
> Thanks for reaching out! Please feel fry to try out your pipeline using
> Beam master branch or one of the nightly SNAPSHOT builds.
>
> On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com> wrote:
>
>> Here is my maven configuration, thank you.
>>
>> 
>>   org.apache.beam
>>   beam-sdks-java-core
>>   0.6.0
>> 
>> 
>>   org.apache.beam
>>   beam-runners-direct-java
>>   0.6.0
>>   runtime
>> 
>> 
>> org.apache.beam
>> beam-sdks-java-io-kafka
>> 0.6.0
>> 
>> 
>> org.apache.beam
>> beam-runners-spark
>> 0.6.0
>> 
>>
>>
>> On 26 Apr 2017, at 6:58 PM, Aviem Zur  wrote:
>>
>> Hi,
>>
>> Can you please share which version of Beam you are using?
>>
>> On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com> wrote:
>>
>>> hi, here is my program that about additional outputs for Apache Beam
>>>  and  the result :
>>> public class 

Re: A problem about additional outputs

2017-05-03 Thread zhenglin.Tian
hi, i have a trouble about addition outputs with SparkRunner.
Here if my code, when i use DirectRunner, everything runs OK, but if i replace 
DirectRunner with SparkRunner, the code can't run normally.

public class UnifiedDataExtraction {
   
    private static TupleTag rawDataTag = new TupleTag() {
    };
    
    private static TupleTag exceptionTag = new TupleTag() {
    };

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);

        SparkPipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
        options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName(ConstantsOwn.SPARK_APPNAME);
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);


        List topics = 
Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));

        PCollection rawData = p.apply(KafkaIO.read()
                .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
                .withTopics(topics)
                //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withKeyDeserializer(VoidDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata()
        ).apply(Values.create());
       
        rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print each 
elment of rawData. Able to run normally   ①
        PCollectionTuple results = rawData.apply("logAnatomyTest",              
                                                          //   ②
                ParDo.of(
                        new DoFn() {
                            @ProcessElement
                            public void process(ProcessContext c) {
                                String element = c.element();
                                System.out.println(""+element);
                                if (!element.equals("EOF")) {
                                    c.output(c.element());
                                }
                            }
                        }
                ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
        );
        p.run().waitUntilFinish();
   }
}

in the privious code, the code that be commented with ① can be able to run 
normally,but ②,i can't get anything.

here is my beam version

    org.apache.beam
    beam-sdks-java-core
    0.7.0-SNAPSHOT


    org.apache.beam
    beam-runners-direct-java
    0.7.0-SNAPSHOT
    runtime


    org.apache.beam
    beam-sdks-java-io-kafka
    0.7.0-SNAPSHOT


     org.apache.beam
     beam-runners-spark
     0.7.0-SNAPSHOT



someone please help me.



Sent from Mailbird 
[http://www.getmailbird.com/?utm_source=Mailbirdutm_medium=emailutm_campaign=sent-from-mailbird]
On 2017/4/28 4:43:23, Aviem Zur  wrote:
Yes. Spark streaming support is still experimental and this issue exists in 
Beam 0.6.0

This has since been fixed and the fix will be a part of the upcoming release.

Since this isn't the first time a user has encountered this I've created a JIRA 
ticket for better visibility for this issue: 
https://issues.apache.org/jira/browse/BEAM-2106 
[https://issues.apache.org/jira/browse/BEAM-2106]

Thanks for reaching out! Please feel fry to try out your pipeline using Beam 
master branch or one of the nightly SNAPSHOT builds.

On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com 
[mailto:4498...@qq.com]> wrote:

Here is my maven configuration, thank you.


  org.apache.beam
  beam-sdks-java-core
  0.6.0


  org.apache.beam
  beam-runners-direct-java
  0.6.0
  runtime


org.apache.beam
beam-sdks-java-io-kafka
0.6.0


org.apache.beam
beam-runners-spark
0.6.0



On 26 Apr 2017, at 6:58 PM, Aviem Zur  wrote:

Hi,

Can you please share which version of Beam you are using?

On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com 
[mailto:4498...@qq.com]> wrote:

hi, here is my program that about additional outputs for Apache Beam  and  the 
result :                                        
public class DataExtraction2 {
    public static void main(String[] args) {
    System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1");
    SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
    options.setSparkMaster("local[4]");
//    options.setCheckpointDir("./checkpoint");
    options.setRunner(SparkRunner.class);
//    options.setRunner(DirectRunner.class);
    options.setStorageLevel("MEMORY_ONLY");
    

Re: A problem about additional outputs

2017-04-27 Thread Aviem Zur
Yes. Spark streaming support is still experimental and this issue exists in
Beam 0.6.0

This has since been fixed and the fix will be a part of the upcoming
release.

Since this isn't the first time a user has encountered this I've created a
JIRA ticket for better visibility for this issue:
https://issues.apache.org/jira/browse/BEAM-2106

Thanks for reaching out! Please feel fry to try out your pipeline using
Beam master branch or one of the nightly SNAPSHOT builds.

On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com> wrote:

> Here is my maven configuration, thank you.
>
> 
>   org.apache.beam
>   beam-sdks-java-core
>   0.6.0
> 
> 
>   org.apache.beam
>   beam-runners-direct-java
>   0.6.0
>   runtime
> 
> 
> org.apache.beam
> beam-sdks-java-io-kafka
> 0.6.0
> 
> 
> org.apache.beam
> beam-runners-spark
> 0.6.0
> 
>
>
> On 26 Apr 2017, at 6:58 PM, Aviem Zur  wrote:
>
> Hi,
>
> Can you please share which version of Beam you are using?
>
> On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com> wrote:
>
>> hi, here is my program that about additional outputs for Apache Beam  and
>>  the result :
>> public class DataExtraction2 {
>> public static void main(String[] args) {
>> System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1");
>> SparkPipelineOptions options =
>> PipelineOptionsFactory.as(SparkPipelineOptions.class);
>> options.setSparkMaster("local[4]");
>> //options.setCheckpointDir("./checkpoint");
>> options.setRunner(SparkRunner.class);
>> //options.setRunner(DirectRunner.class);
>> options.setStorageLevel("MEMORY_ONLY");
>> options.setAppName("testMavenDependency");
>> options.setBatchIntervalMillis(1000L);
>> options.setEnableSparkMetricSinks(false);
>> Pipeline p = Pipeline.create(options);
>> List topics = Arrays.asList("beamOnSparkTest".split(","));
>>
>> final TupleTag rawDataTag = new TupleTag() {
>> };
>>
>> final TupleTag exceptionTag = new TupleTag() {
>> };
>> final TupleTag riskEventLogTag = new TupleTag() {
>> };
>> final TupleTag statisticsTag = new TupleTag() {
>> };
>> final TupleTag errorTargetLogTag = new TupleTag()
>> {
>> };
>> final TupleTag equipmentLogTag = new TupleTag() {
>> };
>> final TupleTag performanceLogTag = new TupleTag()
>> {
>> };
>> PCollection rawData = p.apply(KafkaIO.read()
>> .withBootstrapServers("172.17.1.138:9092,
>> 172.17.1.137:9092")
>> .withTopics(topics)
>> .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
>> .withKeyCoder(VoidCoder.of())
>> .withValueCoder(StringUtf8Coder.of())
>> .withoutMetadata()
>> ).apply(Values.create());
>> PCollectionTuple results = rawData.apply(
>> ParDo.withOutputTags(rawDataTag,
>> TupleTagList.of(exceptionTag)
>> .and(riskEventLogTag)
>> .and(statisticsTag)
>> .and(errorTargetLogTag)
>> .and(equipmentLogTag)
>> .and(performanceLogTag))
>> .of(new DoFn() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> String idCoop = "";
>> int eventType = 0;
>> int osPlatformType = -1;
>> String innerDecision = "";
>> String outterDecision = "";
>> // Date appTime = new Date();
>> String eventId = "";
>> //String strategyList = "";
>> String uuid = "";
>> String phoneNo = "";
>> int equipmentType = -1;
>> int antiFraudTime = -1;
>> ..
>> }
>> }));
>> p.run().waitUntilFinish();
>> }
>> }
>> when i run this program, i get result:
>> .
>> 
>> 2017-04-26 15:06:13,077 [pool-1-thread-1]
>> [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the
>> context, marking it as stopped
>> java.io.NotSerializableException: DStream checkpointing has been enabled
>> but the DStreams with their functions are not serializable
>> org.apache.beam.runners.spark.translation.EvaluationContext
>> Serialization stack:
>> - object not serializable (class:
>> org.apache.beam.runners.spark.translation.EvaluationContext, value:
>> 

Re: A problem about additional outputs

2017-04-27 Thread 4498237@qq
Here is my maven configuration, thank you.


  org.apache.beam
  beam-sdks-java-core
  0.6.0


  org.apache.beam
  beam-runners-direct-java
  0.6.0
  runtime


org.apache.beam
beam-sdks-java-io-kafka
0.6.0


org.apache.beam
beam-runners-spark
0.6.0



> On 26 Apr 2017, at 6:58 PM, Aviem Zur  wrote:
> 
> Hi,
> 
> Can you please share which version of Beam you are using?
> 
> On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com 
> > wrote:
> hi, here is my program that about additional outputs for Apache Beam  and  
> the result :
> public class DataExtraction2 {
> public static void main(String[] args) {
> System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1 <>");
> SparkPipelineOptions options = 
> PipelineOptionsFactory.as(SparkPipelineOptions.class);
> options.setSparkMaster("local[4]");
> //options.setCheckpointDir("./checkpoint");
> options.setRunner(SparkRunner.class);
> //options.setRunner(DirectRunner.class);
> options.setStorageLevel("MEMORY_ONLY");
> options.setAppName("testMavenDependency");
> options.setBatchIntervalMillis(1000L);
> options.setEnableSparkMetricSinks(false);
> Pipeline p = Pipeline.create(options);
> List topics = Arrays.asList("beamOnSparkTest".split(","));
> 
> final TupleTag rawDataTag = new TupleTag() {
> };
>  
> final TupleTag exceptionTag = new TupleTag() {
> };
> final TupleTag riskEventLogTag = new TupleTag() {
> };
> final TupleTag statisticsTag = new TupleTag() {
> };
> final TupleTag errorTargetLogTag = new TupleTag() {
> };
> final TupleTag equipmentLogTag = new TupleTag() {
> };
> final TupleTag performanceLogTag = new TupleTag() {
> };
> PCollection rawData = p.apply(KafkaIO.read()
> .withBootstrapServers("172.17.1.138:9092 
> ,172.17.1.137:9092 ")
> .withTopics(topics)
> .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
> .withKeyCoder(VoidCoder.of())
> .withValueCoder(StringUtf8Coder.of())
> .withoutMetadata()
> ).apply(Values.create());
> PCollectionTuple results = rawData.apply(
> ParDo.withOutputTags(rawDataTag,
> TupleTagList.of(exceptionTag)
> .and(riskEventLogTag)
> .and(statisticsTag)
> .and(errorTargetLogTag)
> .and(equipmentLogTag)
> .and(performanceLogTag))
> .of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> String idCoop = "";
> int eventType = 0;
> int osPlatformType = -1;
> String innerDecision = "";
> String outterDecision = "";
> // Date appTime = new Date();
> String eventId = "";
> //String strategyList = "";
> String uuid = "";
> String phoneNo = "";
> int equipmentType = -1;
> int antiFraudTime = -1;
> ..
> }
> }));
> p.run().waitUntilFinish();
> }
> }
> when i run this program, i get result:
> .
> 
> 2017-04-26 15:06:13,077 [pool-1-thread-1] 
> [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the 
> context, marking it as stopped
> java.io.NotSerializableException: DStream checkpointing has been enabled but 
> the DStreams with their functions are not serializable
> org.apache.beam.runners.spark.translation.EvaluationContext
> Serialization stack:
> - object not serializable (class: 
> org.apache.beam.runners.spark.translation.EvaluationContext, value: 
> org.apache.beam.runners.spark.translation.EvaluationContext@2807813e)
> - field (class: 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
>  name: val$context, type: class 
> org.apache.beam.runners.spark.translation.EvaluationContext)
> - object (class 
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
>  
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8)
> - field (class: 
>