Re: A problem about additional outputs

2017-05-03 Thread Aviem Zur
By "cannot run normally" do you mean you get an exception? We recently had
a bug on master in which streaming pipelines containing `ParDo` with
multiple outputs ran into `NullPointerException`. This was fixed here:
https://issues.apache.org/jira/browse/BEAM-2029
Is this what you're facing? If so does pulling master and rebuilding help?

On Thu, May 4, 2017 at 5:37 AM zhenglin.Tian 
wrote:

> hi, i have a trouble about addition outputs with SparkRunner.
> Here if my code, when i use DirectRunner, everything runs OK, but if i
> replace DirectRunner with SparkRunner, the code can't run normally.
>
> public class UnifiedDataExtraction {
>
> private static TupleTag rawDataTag = new TupleTag() {
> };
>
> private static TupleTag exceptionTag = new TupleTag() {
> };
>
> public static void main(String[] args) {
> System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);
>
> SparkPipelineOptions options =
> PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
> options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
> options.setRunner(SparkRunner.class);
> //options.setRunner(DirectRunner.class);
> options.setStorageLevel("MEMORY_ONLY");
> options.setAppName(ConstantsOwn.SPARK_APPNAME);
> options.setBatchIntervalMillis(1000L);
> options.setEnableSparkMetricSinks(false);
> Pipeline p = Pipeline.create(options);
>
>
> List topics =
> Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));
>
> PCollection rawData = p.apply(KafkaIO.read()
> .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
> .withTopics(topics)
> //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
> .withKeyCoder(VoidCoder.of())
> .withValueCoder(StringUtf8Coder.of())
> .withKeyDeserializer(VoidDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> ).apply(Values.create());
>
> rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print
> each elment of rawData. Able to run normally   ①
> PCollectionTuple results = rawData.apply("logAnatomyTest",
>//   ②
> ParDo.of(
> new DoFn() {
> @ProcessElement
> public void process(ProcessContext c) {
> String element = c.element();
> System.out.println(""+element);
> if (!element.equals("EOF")) {
> c.output(c.element());
> }
> }
> }
> ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
> );
> p.run().waitUntilFinish();
>}
> }
>
> in the privious code, the code that be commented with ① can be able to run
> normally,but ②,i can't get anything.
>
> here is my beam version
> 
> org.apache.beam
> beam-sdks-java-core
> 0.7.0-SNAPSHOT
> 
> 
> org.apache.beam
> beam-runners-direct-java
> 0.7.0-SNAPSHOT
> runtime
> 
> 
> org.apache.beam
> beam-sdks-java-io-kafka
> 0.7.0-SNAPSHOT
> 
> 
>  org.apache.beam
>  beam-runners-spark
>  0.7.0-SNAPSHOT
> 
>
>
> someone please help me.
>
>
>
> Sent from Mailbird
> 
>
> On 2017/4/28 4:43:23, Aviem Zur  wrote:
> Yes. Spark streaming support is still experimental and this issue exists
> in Beam 0.6.0
>
> This has since been fixed and the fix will be a part of the upcoming
> release.
>
> Since this isn't the first time a user has encountered this I've created a
> JIRA ticket for better visibility for this issue:
> https://issues.apache.org/jira/browse/BEAM-2106
>
> Thanks for reaching out! Please feel fry to try out your pipeline using
> Beam master branch or one of the nightly SNAPSHOT builds.
>
> On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com> wrote:
>
>> Here is my maven configuration, thank you.
>>
>> 
>>   org.apache.beam
>>   beam-sdks-java-core
>>   0.6.0
>> 
>> 
>>   org.apache.beam
>>   beam-runners-direct-java
>>   0.6.0
>>   runtime
>> 
>> 
>> org.apache.beam
>> beam-sdks-java-io-kafka
>> 0.6.0
>> 
>> 
>> org.apache.beam
>> beam-runners-spark
>> 0.6.0
>> 
>>
>>
>> On 26 Apr 2017, at 6:58 PM, Aviem Zur  wrote:
>>
>> Hi,
>>
>> Can you please share which version of Beam you are using?
>>
>> On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com> wrote:
>>
>>> hi, here is my program that about additional outputs for Apache Beam
>>>  and  the result :
>>> public class 

Re: A problem about additional outputs

2017-05-03 Thread zhenglin.Tian
hi, i have a trouble about addition outputs with SparkRunner.
Here if my code, when i use DirectRunner, everything runs OK, but if i replace 
DirectRunner with SparkRunner, the code can't run normally.

public class UnifiedDataExtraction {
   
    private static TupleTag rawDataTag = new TupleTag() {
    };
    
    private static TupleTag exceptionTag = new TupleTag() {
    };

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);

        SparkPipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
        options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName(ConstantsOwn.SPARK_APPNAME);
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);


        List topics = 
Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));

        PCollection rawData = p.apply(KafkaIO.read()
                .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
                .withTopics(topics)
                //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withKeyDeserializer(VoidDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata()
        ).apply(Values.create());
       
        rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print each 
elment of rawData. Able to run normally   ①
        PCollectionTuple results = rawData.apply("logAnatomyTest",              
                                                          //   ②
                ParDo.of(
                        new DoFn() {
                            @ProcessElement
                            public void process(ProcessContext c) {
                                String element = c.element();
                                System.out.println(""+element);
                                if (!element.equals("EOF")) {
                                    c.output(c.element());
                                }
                            }
                        }
                ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
        );
        p.run().waitUntilFinish();
   }
}

in the privious code, the code that be commented with ① can be able to run 
normally,but ②,i can't get anything.

here is my beam version

    org.apache.beam
    beam-sdks-java-core
    0.7.0-SNAPSHOT


    org.apache.beam
    beam-runners-direct-java
    0.7.0-SNAPSHOT
    runtime


    org.apache.beam
    beam-sdks-java-io-kafka
    0.7.0-SNAPSHOT


     org.apache.beam
     beam-runners-spark
     0.7.0-SNAPSHOT



someone please help me.



Sent from Mailbird 
[http://www.getmailbird.com/?utm_source=Mailbirdutm_medium=emailutm_campaign=sent-from-mailbird]
On 2017/4/28 4:43:23, Aviem Zur  wrote:
Yes. Spark streaming support is still experimental and this issue exists in 
Beam 0.6.0

This has since been fixed and the fix will be a part of the upcoming release.

Since this isn't the first time a user has encountered this I've created a JIRA 
ticket for better visibility for this issue: 
https://issues.apache.org/jira/browse/BEAM-2106 
[https://issues.apache.org/jira/browse/BEAM-2106]

Thanks for reaching out! Please feel fry to try out your pipeline using Beam 
master branch or one of the nightly SNAPSHOT builds.

On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com 
[mailto:4498...@qq.com]> wrote:

Here is my maven configuration, thank you.


  org.apache.beam
  beam-sdks-java-core
  0.6.0


  org.apache.beam
  beam-runners-direct-java
  0.6.0
  runtime


org.apache.beam
beam-sdks-java-io-kafka
0.6.0


org.apache.beam
beam-runners-spark
0.6.0



On 26 Apr 2017, at 6:58 PM, Aviem Zur  wrote:

Hi,

Can you please share which version of Beam you are using?

On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com 
[mailto:4498...@qq.com]> wrote:

hi, here is my program that about additional outputs for Apache Beam  and  the 
result :                                        
public class DataExtraction2 {
    public static void main(String[] args) {
    System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1");
    SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
    options.setSparkMaster("local[4]");
//    options.setCheckpointDir("./checkpoint");
    options.setRunner(SparkRunner.class);
//    options.setRunner(DirectRunner.class);
    options.setStorageLevel("MEMORY_ONLY");
    

Trevor Grant has shared a document on Google Docs with you

2017-05-03 Thread trevor . d . grant
Trevor Grant has invited you to view the following document:

Open in Docs



Re: BigQuery join in Apache beam

2017-05-03 Thread Prabeesh K.
Hi Dan

Thank you for your prompt reply.

Regards,
Prabeesh K.

On 3 May 2017 at 19:23, Dan Halperin  wrote:

> Hi Prabeesh,
>
> The underlying Beam primitive you use for Join is CoGroupByKey – this
> takes N different collections KV , KV , ... K and
> produces one collection KV Iterable]>. This is a compressed representation of a Join result, in
> that you can expand it to a full outer join, you can implement inner join,
> and you can implement lots of other join algorithms.
>
> There is also a Join library that does this under the hood:
> https://github.com/apache/beam/tree/master/sdks/
> java/extensions/join-library
>
> Dan
>
> On Wed, May 3, 2017 at 6:30 AM, Prabeesh K.  wrote:
>
>> Hi Dan,
>>
>> Sorry for the late response.
>>
>> I agreed with you for the use cases that you mentioned.
>>
>> Advice me and please share if there is any sample code to join two data
>> sets in Beam that are sharing some common keys.
>>
>> Regards,
>> Prabeesh K.
>>
>> On 6 February 2017 at 10:38, Dan Halperin  wrote:
>>
>>> Definitely, using BigQuery for what BigQuery is really good at (big
>>> scans and cost-based joins) is nearly always a good idea. A strong
>>> endorsement of Ankur's answer.
>>>
>>> Pushing the right amount of work into a database is an art, however --
>>> there are some scenarios where you'd rather scan in BQ and join in Beam
>>> because the join result is very large and you can better filter it in Beam,
>>> or because you need to do some pre-join-filtering based on an external API
>>> call (and you don't want to load the results of that API call into
>>> BigQuery)...
>>>
>>> I've only seen a few, rare, cases of the latter.
>>>
>>> Thanks,
>>> Dan
>>>
>>> On Sun, Feb 5, 2017 at 9:19 PM, Prabeesh K. 
>>> wrote:
>>>
 Hi Ankur,

 Thank you for your response.

 On 5 February 2017 at 23:59, Ankur Chauhan  wrote:

> I have found doing joins in bigquery using sql is a lot faster and
> easier to iterate upon.
>
>
> Ankur Chauhan
> On Sat, Feb 4, 2017 at 22:05 Prabeesh K.  wrote:
>
>> Hi,
>>
>> Which is the better way to join two tables in apache beam?
>>
>> Regards,
>> Prabeesh K.
>>
>

>>>
>>
>


Re: BigQuery join in Apache beam

2017-05-03 Thread Dan Halperin
Hi Prabeesh,

The underlying Beam primitive you use for Join is CoGroupByKey – this takes
N different collections KV , KV , ... K and produces
one collection KV. This
is a compressed representation of a Join result, in that you can expand it
to a full outer join, you can implement inner join, and you can implement
lots of other join algorithms.

There is also a Join library that does this under the hood:
https://github.com/apache/beam/tree/master/sdks/java/extensions/join-library


Dan

On Wed, May 3, 2017 at 6:30 AM, Prabeesh K.  wrote:

> Hi Dan,
>
> Sorry for the late response.
>
> I agreed with you for the use cases that you mentioned.
>
> Advice me and please share if there is any sample code to join two data
> sets in Beam that are sharing some common keys.
>
> Regards,
> Prabeesh K.
>
> On 6 February 2017 at 10:38, Dan Halperin  wrote:
>
>> Definitely, using BigQuery for what BigQuery is really good at (big scans
>> and cost-based joins) is nearly always a good idea. A strong endorsement of
>> Ankur's answer.
>>
>> Pushing the right amount of work into a database is an art, however --
>> there are some scenarios where you'd rather scan in BQ and join in Beam
>> because the join result is very large and you can better filter it in Beam,
>> or because you need to do some pre-join-filtering based on an external API
>> call (and you don't want to load the results of that API call into
>> BigQuery)...
>>
>> I've only seen a few, rare, cases of the latter.
>>
>> Thanks,
>> Dan
>>
>> On Sun, Feb 5, 2017 at 9:19 PM, Prabeesh K.  wrote:
>>
>>> Hi Ankur,
>>>
>>> Thank you for your response.
>>>
>>> On 5 February 2017 at 23:59, Ankur Chauhan  wrote:
>>>
 I have found doing joins in bigquery using sql is a lot faster and
 easier to iterate upon.


 Ankur Chauhan
 On Sat, Feb 4, 2017 at 22:05 Prabeesh K.  wrote:

> Hi,
>
> Which is the better way to join two tables in apache beam?
>
> Regards,
> Prabeesh K.
>

>>>
>>
>


Re: BigQuery join in Apache beam

2017-05-03 Thread Prabeesh K.
Hi Dan,

Sorry for the late response.

I agreed with you for the use cases that you mentioned.

Advice me and please share if there is any sample code to join two data
sets in Beam that are sharing some common keys.

Regards,
Prabeesh K.

On 6 February 2017 at 10:38, Dan Halperin  wrote:

> Definitely, using BigQuery for what BigQuery is really good at (big scans
> and cost-based joins) is nearly always a good idea. A strong endorsement of
> Ankur's answer.
>
> Pushing the right amount of work into a database is an art, however --
> there are some scenarios where you'd rather scan in BQ and join in Beam
> because the join result is very large and you can better filter it in Beam,
> or because you need to do some pre-join-filtering based on an external API
> call (and you don't want to load the results of that API call into
> BigQuery)...
>
> I've only seen a few, rare, cases of the latter.
>
> Thanks,
> Dan
>
> On Sun, Feb 5, 2017 at 9:19 PM, Prabeesh K.  wrote:
>
>> Hi Ankur,
>>
>> Thank you for your response.
>>
>> On 5 February 2017 at 23:59, Ankur Chauhan  wrote:
>>
>>> I have found doing joins in bigquery using sql is a lot faster and
>>> easier to iterate upon.
>>>
>>>
>>> Ankur Chauhan
>>> On Sat, Feb 4, 2017 at 22:05 Prabeesh K.  wrote:
>>>
 Hi,

 Which is the better way to join two tables in apache beam?

 Regards,
 Prabeesh K.

>>>
>>
>


BigQuery table backed by Google Sheet

2017-05-03 Thread Prabeesh K.
Hi,

How to we can read a BigQuery table that backed by google sheet?

For me, I am getting the following error.

"error": {
  "errors": [
   {
"domain": "global",
"reason": "accessDenied",
"message": "Access Denied: BigQuery BigQuery: Permission denied while
globbing file pattern.",
"locationType": "other",
"location": "/gdrive/id/--"
   }
  ],
  "code": 403,
  "message": "Access Denied: BigQuery BigQuery: Permission denied while
globbing file pattern."
 }
}


Help to fix this issue.


Regards,
Prabeesh K.


Re: Reprocessing historic data with streaming jobs

2017-05-03 Thread Lars BK
Thanks for your input and sorry for the late reply.

Lukasz, you may be right that running the reprocessing as a batch job will
be better and faster. I'm still experimenting with approach 3 where I
publish all messages and then start the job to let the watermark progress
through the data. It seems to be working fairly well right now, but I'm not
sure that the "somewhat ordered" data I send is "ordered enough". (I can
send data ordered by date, but within each date I can give no guaranees)

Thomas, I had not thought of that, thanks. I like the idea, sounds like it
will handle the merge between archive and live data automatically which
would be very nice.

And Ankur, your case sounds similar. I'm starting to lean towards doing
batch jobs for reprocessing too.

I am going to keep experimenting with different approaches (until I have to
move on), and I'll do my best to update here with my findings later.


Lars

On Mon, May 1, 2017 at 6:51 PM Ankur Chauhan  wrote:

> I have sort of a similar usecase when dealing with failed / cancelled /
> broken streaming pipelines.
> We have an operator that continuously monitors the min-watermark of the
> pipeline and when it detects that the watermark is not advancing for more
> than some threshold. We start a new pipeline and initiate a "patcher" batch
> dataflow that reads the event backups over the possibly broken time range
> (+/- 1 hour).
> It works out well but has the overhead of having to build out an external
> operator process that can detect when to do the batch dataflow process.
>
> Sent from my iPhone
>
> On May 1, 2017, at 09:37, Thomas Groh  wrote:
>
> You should also be able to simply add a Bounded Read from the backup data
> source to your pipeline and flatten it with your Pubsub topic. Because all
> of the elements produced by both the bounded and unbounded sources will
> have consistent timestamps, when you run the pipeline the watermark will be
> held until all of the data is read from the bounded sources. Once this is
> done, your pipeline can continue processing only elements from the PubSub
> source. If you don't want the backlog and the current processing to occur
> in the same pipeline, running the same pipeline but just reading from the
> archival data should be sufficient (all of the processing would be
> identical, just the source would need to change).
>
> If you read from both the "live" and "archival" sources within the same
> pipeline, you will need to use additional machines so the backlog can be
> processed promptly if you use a watermark based trigger; watermarks will be
> held until the bounded source is fully processed.
>
> On Mon, May 1, 2017 at 9:29 AM, Lars BK  wrote:
>
>> I did not see Lukasz reply before I posted, and I will have to read it a
>> bit later!
>>
>> man. 1. mai 2017 kl. 18.28 skrev Lars BK :
>>
>>> Yes, precisely.
>>>
>>> I think that could work, yes. What you are suggesting sounds like idea
>>> 2) in my original question.
>>>
>>> My main concern is that I would have to allow a great deal of lateness
>>> and that old windows would consume too much memory. Whether it works in my
>>> case or not I don't know yet as I haven't tested it.
>>>
>>> What if I had to process even older data? Could I handle any "oldness"
>>> of data by increasing the allowed lateness and throwing machines at the
>>> problem to hold all the old windows in memory while the backlog is
>>> processed? If so, great! But I would have to dial the allowed lateness back
>>> down when the processing has caught up with the present.
>>>
>>> Is there some intended way of handling reprocessing like this? Maybe
>>> not? Perhaps it is more of a Pubsub and Dataflow question than a Beam
>>> question when it comes down to it.
>>>
>>>
>>> man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré :
>>>
 OK, so the messages are "re-publish" on the topic, with the same
 timestamp as
 the original and consume again by the pipeline.

 Maybe, you can play with the allowed lateness and late firings ?

 Something like:

Window.into(FixedWindows.of(Duration.minutes(xx)))
.triggering(AfterWatermark.pastEndOfWindow()

  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))

  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(Duration.minutes()
.accumulatingFiredPanes())

 Thoughts ?

 Regards
 JB

 On 05/01/2017 05:12 PM, Lars BK wrote:
 > Hi Jean-Baptiste,
 >
 > I think the key point in my case is that I have to process or
 reprocess "old"
 > messages. That is, messages that are late because they are streamed
 from an
 > archive file and are older