Re: Regarding Beam Slack Channel

2017-06-23 Thread Manu Zhang
Welcome Ratnakar!!!

On Sat, Jun 24, 2017 at 10:18 AM Ratnakar Malla  wrote:

> Hello Manu,
> Can you please invite me to the slack channel as well.
> Thanks
> Ratnakar
>
> On Fri, Jun 23, 2017 at 5:44 PM, Manu Zhang 
> wrote:
>
>> Sent to both of you
>>
>> On Sat, Jun 24, 2017 at 4:03 AM Hillerman, Carl <
>> carl_hiller...@homedepot.com> wrote:
>>
>>> Ditto
>>>
>>>
>>>
>>> *Carl Hillerman | Staff Software Engineer*
>>>
>>> *The **Home** Depot *| *S*tore Support  *C*enter
>>>
>>> Email: carl_hiller...@homedepot.com 
>>>
>>> Office: (770)433-8211 <(770)%20433-8211> | Extn: 24292
>>>
>>> Jabber: (470)689-4292 <(470)%20689-4292>
>>>
>>>
>>>
>>> *From: *Jyotirmoy Sundi 
>>> *Reply-To: *"user@beam.apache.org" 
>>> *Date: *Friday, June 23, 2017 at 4:00 PM
>>> *To: *"user@beam.apache.org" 
>>> *Subject: *Re: Regarding Beam Slack Channel
>>>
>>>
>>>
>>> Hi Manu,
>>>
>>>  May you please also send to me ?
>>>
>>>
>>>
>>> On Fri, Jun 23, 2017 at 4:31 AM, Manu Zhang 
>>> wrote:
>>>
>>> Hey Kamil,
>>>
>>>
>>>
>>> Just sent you an invitation.
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jun 23, 2017 at 6:38 PM Kamil Szewczyk 
>>> wrote:
>>>
>>> Hello,
>>>
>>> Can someone please add me to the Beam slack channel?
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Best Regards,
>>> Jyotirmoy Sundi
>>>
>>>
>>>
>>> --
>>>
>>> The information in this Internet Email is confidential and may be
>>> legally privileged. It is intended solely for the addressee. Access to this
>>> Email by anyone else is unauthorized. If you are not the intended
>>> recipient, any disclosure, copying, distribution or any action taken or
>>> omitted to be taken in reliance on it, is prohibited and may be unlawful.
>>> When addressed to our clients any opinions or advice contained in this
>>> Email are subject to the terms and conditions expressed in any applicable
>>> governing The Home Depot terms of business or client engagement letter. The
>>> Home Depot disclaims all responsibility and liability for the accuracy and
>>> content of this attachment and for any damages or losses arising from any
>>> inaccuracies, errors, viruses, e.g., worms, trojan horses, etc., or other
>>> items of a destructive nature, which may be contained in this attachment
>>> and shall not be liable for direct, indirect, consequential or special
>>> damages in connection with this e-mail message or its attachment.
>>>
>>
>


Re: Regarding Beam Slack Channel

2017-06-23 Thread Manu Zhang
Sent to both of you

On Sat, Jun 24, 2017 at 4:03 AM Hillerman, Carl <
carl_hiller...@homedepot.com> wrote:

> Ditto
>
>
>
> *Carl Hillerman | Staff Software Engineer*
>
> *The **Home** Depot *| *S*tore Support  *C*enter
>
> Email: carl_hiller...@homedepot.com 
>
> Office: (770)433-8211 <(770)%20433-8211> | Extn: 24292
>
> Jabber: (470)689-4292 <(470)%20689-4292>
>
>
>
> *From: *Jyotirmoy Sundi 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Friday, June 23, 2017 at 4:00 PM
> *To: *"user@beam.apache.org" 
> *Subject: *Re: Regarding Beam Slack Channel
>
>
>
> Hi Manu,
>
>  May you please also send to me ?
>
>
>
> On Fri, Jun 23, 2017 at 4:31 AM, Manu Zhang 
> wrote:
>
> Hey Kamil,
>
>
>
> Just sent you an invitation.
>
>
>
>
>
> On Fri, Jun 23, 2017 at 6:38 PM Kamil Szewczyk  wrote:
>
> Hello,
>
> Can someone please add me to the Beam slack channel?
>
> Thanks.
>
>
>
>
>
> --
>
> Best Regards,
> Jyotirmoy Sundi
>
>
>
> --
>
> The information in this Internet Email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this Email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful. When addressed
> to our clients any opinions or advice contained in this Email are subject
> to the terms and conditions expressed in any applicable governing The Home
> Depot terms of business or client engagement letter. The Home Depot
> disclaims all responsibility and liability for the accuracy and content of
> this attachment and for any damages or losses arising from any
> inaccuracies, errors, viruses, e.g., worms, trojan horses, etc., or other
> items of a destructive nature, which may be contained in this attachment
> and shall not be liable for direct, indirect, consequential or special
> damages in connection with this e-mail message or its attachment.
>


Re: Regarding Beam Slack Channel

2017-06-23 Thread Hillerman, Carl
Ditto

Carl Hillerman | Staff Software Engineer
The Home Depot | Store Support  Center
Email: carl_hiller...@homedepot.com
Office: (770)433-8211 | Extn: 24292
Jabber: (470)689-4292

From: Jyotirmoy Sundi 
Reply-To: "user@beam.apache.org" 
Date: Friday, June 23, 2017 at 4:00 PM
To: "user@beam.apache.org" 
Subject: Re: Regarding Beam Slack Channel

Hi Manu,
 May you please also send to me ?

On Fri, Jun 23, 2017 at 4:31 AM, Manu Zhang 
> wrote:
Hey Kamil,

Just sent you an invitation.


On Fri, Jun 23, 2017 at 6:38 PM Kamil Szewczyk 
> wrote:
Hello,

Can someone please add me to the Beam slack channel?

Thanks.



--
Best Regards,
Jyotirmoy Sundi




The information in this Internet Email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this Email by 
anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful. When addressed to our 
clients any opinions or advice contained in this Email are subject to the terms 
and conditions expressed in any applicable governing The Home Depot terms of 
business or client engagement letter. The Home Depot disclaims all 
responsibility and liability for the accuracy and content of this attachment 
and for any damages or losses arising from any inaccuracies, errors, viruses, 
e.g., worms, trojan horses, etc., or other items of a destructive nature, which 
may be contained in this attachment and shall not be liable for direct, 
indirect, consequential or special damages in connection with this e-mail 
message or its attachment.


Re: Regarding Beam Slack Channel

2017-06-23 Thread Jyotirmoy Sundi
Hi Manu,
 May you please also send to me ?

On Fri, Jun 23, 2017 at 4:31 AM, Manu Zhang  wrote:

> Hey Kamil,
>
> Just sent you an invitation.
>
>
> On Fri, Jun 23, 2017 at 6:38 PM Kamil Szewczyk  wrote:
>
>> Hello,
>>
>> Can someone please add me to the Beam slack channel?
>>
>> Thanks.
>>
>


-- 
Best Regards,
Jyotirmoy Sundi


Re: 答复: Creating side input map with global window

2017-06-23 Thread Lukasz Cwik
To unit test your function, have it accept a supplier with the default
supplier being the one that gives you a reference to the static instance
and another supplier for testing purposes.

On Fri, Jun 23, 2017 at 8:23 AM, Kevin Peterson  wrote:

> Hey Lukasz,
>
> I tried using the setup function, but since this a streaming pipeline, the
> batches tend to be pretty small. I could force the pipeline to batch things
> up, but that feels like something that shouldn't be needed. I was already
> caching between elements within a thread, the problem was at pipeline
> start, or when a new instance was started, since each thread has its own
> cache.
>
> Using a static cache worked!
>
> private static final LoadingCache CACHE =
> CacheBuilder.newBuilder()
> .refreshAfterWrite(30, TimeUnit.MINUTES)
> .build(new CacheLoader());
>
> This has gotten me unblocked, but isn't a perfect solution. Because the
> cache is static, I can't set any parameters of it, meaning that it is very
> hard to unit test because it is hard coded to access cloud storage instead
> of a local file.
>
> I tried using a Singleton to hold the cache and fetch it from the DoFn,
> but it seems like the Singleton isn't shared amongst all of the threads. I
> can see from the logs that all of the DoFn calls are on the same worker
> instance and different threads, I see a log statement from inside my
> synchronized block for each thread, which shouldn't be possible.
>
> Thoughts?
>
>
> On Thu, Jun 15, 2017 at 6:26 AM, Lukasz Cwik  wrote:
>
>> Take a look at DoFn setup/teardown, called only once per DoFn instance
>> and not per element so it makes easier to write initialization code.
>>
>> Also if the schema map is shared, have you thought of using a single
>> static instance of Guava's LoadingCache shared amongst all the DoFn
>> instances?
>>
>> You can also refresh the data stored within the cache periodically.
>>
>> On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson 
>> wrote:
>>
>>> Still gets stuck at the same place :/
>>>
>>> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
>>> tangji...@yhd.com> wrote:
>>>


 .triggering(
 
 AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
 .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));



 Try the trigger above



 *发件人:* Kevin Peterson [mailto:kevi...@google.com]
 *发送时间:* 2017年6月15日 2:39
 *收件人:* user@beam.apache.org
 *主题:* Fwd: Creating side input map with global window



 Hi all,



 I am working on a (streaming) pipeline which reads elements from
 Pubsub, and schemas for those elements from a separate pubsub topic. I'd
 like to be able to create a side input map from the schema topic, and have
 that available to the main pipeline for parsing. Each message on the schema
 pubsub topic contains all schemas I care about, so for every new message, I
 want to generate a new map that will be available to the main pipeline
 (eventual consistency is fine). I don't have any windows or triggers on the
 main flow, since I really just want each element to be processed as it
 arrives, using whatever the latest schema available is.



 I am currently trying this with:



 PCollection> schema = pipeline
 .apply("Read Schema",
 PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
 .apply(Window.*into*(new GlobalWindows()).triggering(
 
 Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
 .apply("Create Schema", ParDo.*of*(new 
 SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for 
 each input



 PCollectionView> schemaView =
 schema.apply(View.*asMap*());

 pipeline
 .apply("Read Elements", 
 PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse 
 Elements",

 ParDo.*of*(new DoFn() {
 @ProcessElement
 public void processElement(ProcessContext c) {

 String name = getNameFromElement(c.element());


 String schema = c.sideInput(schemaView).get(name);


 c.output(parse(c, schema));

 }
 }).withSideInputs(schemaView)).apply("Write to Table", 
 BigQueryIO.*writeTableRows*()) // Other BQ options not copied.

 When running this pipeline, the View.AsMap/View.CreatePCol
 lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
 stage never emits any elements, and so the 

Re: 答复: Creating side input map with global window

2017-06-23 Thread Kevin Peterson
Hey Lukasz,

I tried using the setup function, but since this a streaming pipeline, the
batches tend to be pretty small. I could force the pipeline to batch things
up, but that feels like something that shouldn't be needed. I was already
caching between elements within a thread, the problem was at pipeline
start, or when a new instance was started, since each thread has its own
cache.

Using a static cache worked!

private static final LoadingCache CACHE =
CacheBuilder.newBuilder()
.refreshAfterWrite(30, TimeUnit.MINUTES)
.build(new CacheLoader());

This has gotten me unblocked, but isn't a perfect solution. Because the
cache is static, I can't set any parameters of it, meaning that it is very
hard to unit test because it is hard coded to access cloud storage instead
of a local file.

I tried using a Singleton to hold the cache and fetch it from the DoFn, but
it seems like the Singleton isn't shared amongst all of the threads. I can
see from the logs that all of the DoFn calls are on the same worker
instance and different threads, I see a log statement from inside my
synchronized block for each thread, which shouldn't be possible.

Thoughts?


On Thu, Jun 15, 2017 at 6:26 AM, Lukasz Cwik  wrote:

> Take a look at DoFn setup/teardown, called only once per DoFn instance and
> not per element so it makes easier to write initialization code.
>
> Also if the schema map is shared, have you thought of using a single
> static instance of Guava's LoadingCache shared amongst all the DoFn
> instances?
>
> You can also refresh the data stored within the cache periodically.
>
> On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson 
> wrote:
>
>> Still gets stuck at the same place :/
>>
>> On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) <
>> tangji...@yhd.com> wrote:
>>
>>>
>>>
>>> .triggering(
>>> 
>>> AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1)))
>>> .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*));
>>>
>>>
>>>
>>> Try the trigger above
>>>
>>>
>>>
>>> *发件人:* Kevin Peterson [mailto:kevi...@google.com]
>>> *发送时间:* 2017年6月15日 2:39
>>> *收件人:* user@beam.apache.org
>>> *主题:* Fwd: Creating side input map with global window
>>>
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>> I am working on a (streaming) pipeline which reads elements from Pubsub,
>>> and schemas for those elements from a separate pubsub topic. I'd like to be
>>> able to create a side input map from the schema topic, and have that
>>> available to the main pipeline for parsing. Each message on the schema
>>> pubsub topic contains all schemas I care about, so for every new message, I
>>> want to generate a new map that will be available to the main pipeline
>>> (eventual consistency is fine). I don't have any windows or triggers on the
>>> main flow, since I really just want each element to be processed as it
>>> arrives, using whatever the latest schema available is.
>>>
>>>
>>>
>>> I am currently trying this with:
>>>
>>>
>>>
>>> PCollection> schema = pipeline
>>> .apply("Read Schema",
>>> PubsubIO.*readStrings*().fromTopic("topic_for_schema"))
>>> .apply(Window.*into*(new GlobalWindows()).triggering(
>>> 
>>> Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes())
>>> .apply("Create Schema", ParDo.*of*(new 
>>> SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for 
>>> each input
>>>
>>>
>>>
>>> PCollectionView> schemaView =
>>> schema.apply(View.*asMap*());
>>>
>>> pipeline
>>> .apply("Read Elements", 
>>> PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse 
>>> Elements",
>>>
>>> ParDo.*of*(new DoFn() {
>>> @ProcessElement
>>> public void processElement(ProcessContext c) {
>>>
>>> String name = getNameFromElement(c.element());
>>>
>>>
>>> String schema = c.sideInput(schemaView).get(name);
>>>
>>>
>>> c.output(parse(c, schema));
>>>
>>> }
>>> }).withSideInputs(schemaView)).apply("Write to Table", 
>>> BigQueryIO.*writeTableRows*()) // Other BQ options not copied.
>>>
>>> When running this pipeline, the View.AsMap/View.CreatePCol
>>> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>>> stage never emits any elements, and so the pipeline never progresses. I
>>> can see the messages at the input stage, but nothing appears on the output.
>>>
>>>
>>>
>>> Any advice?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> -Kevin
>>>
>>>
>>>
>>
>>
>


Regarding Beam Slack Channel

2017-06-23 Thread Kamil Szewczyk
Hello,

Can someone please add me to the Beam slack channel?

Thanks.