Re: working with hot keys

2018-02-13 Thread Jacob Marble
On Mon, Feb 12, 2018 at 3:59 PM, Lukasz Cwik <lc...@google.com> wrote:

> The optimization that you have done is that you have forced the V1
> iterable to reside in memory completely since it is now counted as a single
> element. This will fall apart as soon your V1 iterable exceeds memory.
> Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing
> for the GBK/CoGBK result to exceed the size of memory and this currently
> only functions at the first level within the value iterable, meaning that
> the entire Iterable is treated as a single value in your
> Join.someJoin. You should see similar performance if you take all the V1s
> out of the CoGBK and "copy" it into an arraylist inside your DoFn and then
> walk the V2 iterable and the in memory array list performing the outer
> join. It will also likely be easier to reason about. Note that Dataflow
> doesn't do this in a great way and causes the re-iteration to happen many
> more times then it should need to which is why your perf numbers are
> ballooning.
>

Not sure, are you referring to (1) or (2) of my original?

So (2) did fail with full production load. The join has 2+ billion elements
on both sides.


> Alternatively, have you tried putting either of the PCollection<K, V (V1
> or V2)> into a multimap side input and then just doing a GBK on the other
> PCollection<K, V (V2 or V1)> followed by a DoFn that joins the two together
> with the multimap side input?
> The choice of whether V1 or V2 works better in the side input depends on
> the sizes of the relative PCollections and whether the working set of the
> PCollection can be cached in memory (good for side input) or the GBK
> PCollection is sparse enough that if everything is cache miss it won't
> matter.
>

I'll try this. K:V2 has unique keys and doesn't change a lot from
day-to-day, so I'll make that a side input. Should I expect this method to
perform significantly slower than Join.someJoin/CoGBK?

On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble <jmar...@kochava.com> wrote:
>
>> When joining (Join.leftOuterJoin etc) a PCollection<K, V1> to
>> PCollection<K, V2>, and K:V1 contains hot keys, my pipeline gets very slow.
>> It can bring processing time from hours to days.
>>
>> Reading this blog post
>> <https://cloud.google.com/blog/big-data/2016/02/writing-dataflow-pipelines-with-scalability-in-mind>
>>  I
>> can see some thought has already been given to this problem:
>> "To address this, we allow you to provide extra parallelism hints using
>> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout.
>> These operations will create an extra step in your pipeline to
>> pre-aggregate the data on many machines before performing the final
>> aggregation on the target machines."
>>
>> (1 of 2)
>>
>> These two solutions, Combine.PerKey.withHotKeyFanout or
>> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
>> however. So, I solved my problem with these stages before and after the
>> join operation, effectively joining K:Iterable with K:V2:
>>
>> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.<K,
>> V1>create())
>>
>> Join.someJoin(kvIterable1, kv2)
>> .apply(Values.create())
>> .apply("undo hot key GBK",
>> ParDo
>> .of(new DoFn<KV<Iterable, V2>, KV<V1, V2>>() {
>>   @ProcessElement
>>   public void fanout(ProcessContext context) {
>> for (V1 v1 : context.element().getKey()) {
>>   context.output(KV.of(v1,
>> context.element().getValue()));
>> }
>>   }
>> }))
>>
>> Does that look sane to people who have been working with Beam for a long
>> time? It has worked well for us over the last two months or so.
>>
>> (2 of 2)
>>
>> Lately, the size of the value has grown too large. It took some effort to
>> figure out the problem, which manifested as an
>> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
>> Here's the follow-up solution, only changing the first half of the above
>> solution:
>>
>> kvIterable1 = kv1
>> .apply("GBK to mitigate hot keys", GroupByKey.<K, V1>create())
>> .apply("partition grouped values",
>> ParDo
>> .of(new DoFn<KV<K, Iterable>, KV<K, Iterable>>() {
>>   @ProcessElement
>>   public void partition(ProcessContext context) {
>> K k = context.element().getKey();
>> Iterable v1Iterable =
>> context.element().getValue();
>> for (List partition :
>> Iterables.partition(v1Iterable, 100)) {
>>   context.output(KV.<K, Iterable>of(k,
>> partition));
>> }
>>   }
>> }));
>>
>> Again, is this sane? Initial testing suggests this is a good solution.
>>
>> Jacob
>>
>
>


Re: Dependencies and Datastore

2018-01-31 Thread Jacob Marble
Thanks for that, Joshua.

Late last night I moved my BigTable-related code (depending on
beam:beam-sdks-java-io-google-cloud-platform) into a separate pom/project
from GCS-related code (depending on google-cloud-storage).

pom A:

+- org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.2.0:compile
|  +- com.google.api:gax-grpc:jar:0.20.0:compile
|  |  +- io.grpc:grpc-protobuf:jar:1.2.0:compile
|  |  +- com.google.api:api-common:jar:1.1.0:compile
|  |  +- com.google.api:gax:jar:1.3.1:compile
|  |  \- org.threeten:threetenbp:jar:1.3.3:compile

pom B:

|  +- org.apache.beam:beam-sdks-java-core:jar:2.2.0:compile
|  \-
org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.2.0:compile
| +- com.google.api:gax-grpc:jar:0.20.0:compile
| |  +- io.grpc:grpc-protobuf:jar:1.2.0:compile
| |  \- org.threeten:threetenbp:jar:1.3.3:compile
...
+- com.google.cloud:google-cloud-storage:jar:1.4.0:compile
|  +- com.google.cloud:google-cloud-core:jar:1.4.0:compile
|  |  +- org.json:json:jar:20160810:compile
|  |  +- com.google.api:api-common:jar:1.1.0:compile
|  |  +- com.google.api:gax:jar:1.5.0:compile

Jacob

On Wed, Jan 31, 2018 at 3:24 AM, Joshua Fox <jos...@freightos.com> wrote:

> The problem does not occur for me now following a dependency update.
>
> Here are the parts of mvn dependency:tree that involve gax
>
>  |  +- org.apache.beam:beam-sdks-java-io-google-cloud-platform:
> jar:2.2.0:compile
>  |  |  +- com.google.api:gax-grpc:jar:0.20.0:compile
>
> ...
>
>  +- com.google.code.gson:gson:jar:2.8.2:compile
>  \- com.google.api:gax:jar:1.15.0:compile
>
> Those that involve Beam
>
>  +- org.apache.beam:beam-runners-google-cloud-dataflow-java:
> jar:2.2.0:compile
>  |  +- org.apache.beam:beam-sdks-java-core:jar:2.2.0:compile
>  |  +- org.apache.beam:beam-sdks-java-extensions-google-cloud-
> platform-core:jar:2.2.0:compile
>  |  +- org.apache.beam:beam-sdks-common-runner-api:jar:2.2.0:compile
> ...
>
>  |  +- org.apache.beam:beam-sdks-java-io-google-cloud-platform:
> jar:2.2.0:compile
>  |  |  +- com.google.api:gax-grpc:jar:0.20.0:compile
>
>
> Here are the parts of my pom that involve gax and Beam
>
>
> 
> org.apache.beam
> beam-runners-google-cloud-dataflow-java
> 2.2.0
> 
>
>
> 
> com.google.api
> gax
> 1.15.0
> 
>
>
>
>
> On Wed, Jan 31, 2018 at 2:55 AM, Jacob Marble <jmar...@kochava.com> wrote:
>
>> Josh, what did you do to work around this?
>>
>> This suddenly crept up on a production pipeline yesterday, without
>> anything changing on our side (we do rebuild at every run).
>>
>> Jacob
>>
>> On Fri, Dec 8, 2017 at 6:46 PM, Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Created https://issues.apache.org/jira/browse/BEAM-3321 to update
>>> the gax-grpc dependency of Beam.
>>>
>>> - Cham
>>>
>>> On Friday, December 8, 2017 at 6:12:20 PM UTC-8, Harsh Vardhan wrote:
>>>>
>>>> +chamikara@
>>>>
>>>>
>>>> On Friday, December 8, 2017 at 1:31:00 AM UTC-8, Joshua Fox wrote:
>>>>>
>>>>> I use Cloud Datastore API to check for Kinds in the Datastore, then
>>>>> use Dataflow -- now upgrading to Beam -- to copy one Datastore to 
>>>>> another..
>>>>>
>>>>> After adding beam-sdks-java-io-google-cloud-platform to my pom, I
>>>>> start getting this when initializing the Cloud Datastore API
>>>>>
>>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>> com/google/api/gax/retrying/ResultRetryAlgorithm
>>>>> at com.google.cloud.datastore.DatastoreOptions$DefaultDatastore
>>>>> Factory.create(DatastoreOptions.java:51)
>>>>> at com.google.cloud.datastore.DatastoreOptions$DefaultDatastore
>>>>> Factory.create(DatastoreOptions.java:45)
>>>>> at com.google.cloud.ServiceOptions.getService(ServiceOptions.java:426)
>>>>>
>>>>> It is caused by  gax dependencies.
>>>>>
>>>>> Specifically, before I add beam-sdks-java-io-google-cloud-platform there
>>>>> is this  gax dependency
>>>>>
>>>>> +- com.google.cloud:google-cloud-datastore:jar:1.12.0:compile
>>>>> |  +- com.google.cloud:google-cloud-core:jar:1.12.0:compile
>>>>> |  |  +- com.google.api:gax:jar:1.15.0:compile
>>>>>
>>>>>
>>>>>
>>>>> and after I add  it there is this
>>>>>
>>>>> +- org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:
>>>

Re: Dependencies and Datastore

2018-01-30 Thread Jacob Marble
Josh, what did you do to work around this?

This suddenly crept up on a production pipeline yesterday, without anything
changing on our side (we do rebuild at every run).

Jacob

On Fri, Dec 8, 2017 at 6:46 PM, Chamikara Jayalath 
wrote:

> Created https://issues.apache.org/jira/browse/BEAM-3321 to update
> the gax-grpc dependency of Beam.
>
> - Cham
>
> On Friday, December 8, 2017 at 6:12:20 PM UTC-8, Harsh Vardhan wrote:
>>
>> +chamikara@
>>
>>
>> On Friday, December 8, 2017 at 1:31:00 AM UTC-8, Joshua Fox wrote:
>>>
>>> I use Cloud Datastore API to check for Kinds in the Datastore, then  use
>>> Dataflow -- now upgrading to Beam -- to copy one Datastore to another..
>>>
>>> After adding beam-sdks-java-io-google-cloud-platform to my pom, I start
>>> getting this when initializing the Cloud Datastore API
>>>
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> com/google/api/gax/retrying/ResultRetryAlgorithm
>>> at com.google.cloud.datastore.DatastoreOptions$DefaultDatastore
>>> Factory.create(DatastoreOptions.java:51)
>>> at com.google.cloud.datastore.DatastoreOptions$DefaultDatastore
>>> Factory.create(DatastoreOptions.java:45)
>>> at com.google.cloud.ServiceOptions.getService(ServiceOptions.java:426)
>>>
>>> It is caused by  gax dependencies.
>>>
>>> Specifically, before I add beam-sdks-java-io-google-cloud-platform there
>>> is this  gax dependency
>>>
>>> +- com.google.cloud:google-cloud-datastore:jar:1.12.0:compile
>>> |  +- com.google.cloud:google-cloud-core:jar:1.12.0:compile
>>> |  |  +- com.google.api:gax:jar:1.15.0:compile
>>>
>>>
>>>
>>> and after I add  it there is this
>>>
>>> +- org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:
>>> 2.2.0:compile
>>> |  +- com.google.api:gax-grpc:jar:0.20.0:compile
>>> |  |  +- com.google.api:gax:jar:1.3.1:compile
>>>
>>>
>>> If I add gax 1.15.0 to my pom explicitly, I get
>>>
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> com/google/api/gax/retrying/ExceptionRetryAlgorithm
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>> ..
>>> at com.google.cloud.BaseService.(BaseService.java:48)
>>> at com.google.cloud.datastore.DatastoreOptions$DefaultDatastore
>>> Factory.create(DatastoreOptions.java:51)
>>> at com.google.cloud.datastore.DatastoreOptions$DefaultDatastore
>>> Factory.create(DatastoreOptions.java:45)
>>> at com.google.cloud.ServiceOptions.getService(ServiceOptions.java:426)
>>>
>>> Clearly Datastore and Beam should work together. Yet there have been
>>> dependency problems between the two for  a while. See this discussion
>>>  from 1 year ago.
>>>
>>> How can I resolve this?
>>>
>>


Troubleshooting live Java process in Dataflow

2017-12-07 Thread Jacob Marble
In this post, Reuven says:
"we had to ssh to the VMs to get actual thread profiles from workers. After
a bit of digging, we found threads were often stuck in the following stack
trace"

Can someone describe what tools you use to do this?

I logged into a Dataflow runner, found that the gc is thrashing. Neat! Now
I'm trying to get a thread dump. Looks like the Dataflow runner is actually
running in a container within the VM, and the host VM doesn't have jmap or
any j* utility installed.

Tried kill -3, didn't seem to trigger a thread dump.

Also found an open JMX port, but only hangs VisualVM and JConsole.

Jacob


Re: @DoFn.Setup not called

2017-11-21 Thread Jacob Marble
Cool! Thanks Kenn.

Jacob

On Mon, Nov 20, 2017 at 9:57 AM, Kenneth Knowles <k...@google.com> wrote:

> I wanted to follow up that this has been reproduced and diagnosed, and a
> fix is underway. The ticket to follow is https://issues.apache.org/
> jira/browse/BEAM-3219.
>
> Kenn
>
> On Fri, Nov 17, 2017 at 12:23 PM, Jacob Marble <jmar...@kochava.com>
> wrote:
>
>> Here is a small pipeline job that fails using the Dataflow runner, but
>> doesn't fail using the direct runner.
>>
>> https://gist.github.com/jacobmarble/804c2edb9c80a2863f3e671d6851a55f
>>
>> Jacob
>>
>> On Fri, Nov 17, 2017 at 9:27 AM, Kenneth Knowles <k...@google.com> wrote:
>>
>>> It is definitely a big deal if @Setup is not getting called! There are
>>> no special cases that would skip @Setup. Please do report what you can.
>>>
>>> That said, lazily doing setup (via null check or some such as you
>>> mention) is perfectly fine and often a more robust programming pattern.
>>> Upside: you can't accidentally use uninitialized things. Downside: it might
>>> mask repeated initialization and only manifest as poor performance.
>>>
>>> Kenn
>>>
>>> On Fri, Nov 17, 2017 at 9:00 AM, Jacob Marble <jmar...@kochava.com>
>>> wrote:
>>>
>>>> I tried to write a simpler DoFn that induces the error, but it works
>>>> fine. Working around the issue today by using @StartBundle with a null
>>>> check, and that seems to be working.
>>>>
>>>> If this really is a big deal, then it needs to be reported, so I'll try
>>>> to find time to write a broken example.
>>>>
>>>> Jacob
>>>>
>>>> On Thu, Nov 16, 2017 at 10:27 PM, Eugene Kirpichov <
>>>> kirpic...@google.com> wrote:
>>>>
>>>>> Could you give more details, e.g. a code snippet that reproduces the
>>>>> issue, and describe how you determine that @Setup hasn't been called?
>>>>>
>>>>> On Thu, Nov 16, 2017 at 6:58 PM Derek Hao Hu <phoenixin...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> ​I've been using DoFn.Setup method in Dataflow and it seems to be
>>>>>> working fine.​
>>>>>>
>>>>>> On Thu, Nov 16, 2017 at 4:56 PM, Jacob Marble <jmar...@kochava.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This one is weird.
>>>>>>>
>>>>>>> A DoFn I wrote:
>>>>>>> - stateful
>>>>>>> - used plenty in a streaming pipeline
>>>>>>> - direct and dataflow runners
>>>>>>> - works fine
>>>>>>>
>>>>>>> Now:
>>>>>>> - new batch pipeline
>>>>>>> - @DoFn.Setup method not called
>>>>>>> - direct runner works properly (logs from setup method are output)
>>>>>>> - dataflow runner simply doesn't call the setup method
>>>>>>>
>>>>>>> Is this possibly a Beam misuse? Javadoc for DoFn.Setup doesn't hint
>>>>>>> at anything, so I'm suspecting Dataflow bug?
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Derek Hao Hu
>>>>>>
>>>>>> Software Engineer | Snapchat
>>>>>> Snap Inc.
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: unique DoFn id

2017-11-19 Thread Jacob Marble
That helps, thanks.

On Sun, Nov 19, 2017 at 7:29 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> That's correct. DoFns are serialized in the pipeline description and
> shipped to workers and deserialized there. Standard Java serialization is
> used, and Java serialization doesn't call the constructor - it directly
> creates an instance of the class (even if it doesn't declare a default
> constructor) and repopulates fields.
>
> On Sun, Nov 19, 2017, 7:07 PM Jacob Marble <jmar...@kochava.com> wrote:
>
>> Eugene, that worked. Can you explain why this doesn't work when I set the
>> UUID (or random value) from the constructor?
>>
>> It looks like the DoFn constructor is called once by the worker, then
>> that constructed object is copied as many times as needed, each instance
>> getting it's own thread and @Setup,@StartBundle,@etc loop. Is that correct?
>>
>> Thanks for the help.
>>
>> Jacob
>>
>> On Sun, Nov 19, 2017 at 10:24 AM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> You could create a private variable with a UUID, filled in in @Setup or
>>> (if you're hitting that bug where @Setup wasn't being called) in
>>> readObject()?
>>>
>>> On Sun, Nov 19, 2017 at 8:17 AM Jacob Marble <jmar...@kochava.com>
>>> wrote:
>>>
>>>> Is there a recommended way to get a unique id for each instance of a
>>>> DoFn?
>>>>
>>>> - DataflowWorkerHarnessOptions.getWorkerId() only returns a unique id
>>>> per worker, which can contain multiple instances of a DoFn.
>>>> - Looks like ThreadLocalRandom is seeded with the same value on every
>>>> instance
>>>> - Thinking I'll try workerId + construction timestamp next
>>>>
>>>> Jacob
>>>>
>>>
>> --
Jacob


Re: @DoFn.Setup not called

2017-11-17 Thread Jacob Marble
I also notice that stateful DoFn's seem to only be instantiated once in
Dataflow, but multiple instances do end up being created in the direct
runner. Is there a story behind that?

Jacob

On Fri, Nov 17, 2017 at 7:22 PM, Jacob Marble <jmar...@kochava.com> wrote:

> Noticing some related and unexpected differences between batch and
> streaming pipelines.
>
> Why does a stateful DoFn behave like GroupByKey (no data output until all
> data input is complete) in a batch pipeline, but not in a streaming
> pipeline? It looks like BatchStatefulParDoOverrides has something to do
> with it, but I can't figure out why, or how to work around it.
>
> In this current project:
> 1) read 500 million elements
> 2) very slow stateful DoFn (rate limited API calls)
> 3) write results
>
> To complete step 1 in a reasonable time, multiple workers are required,
> but Dataflow's autoscaling doesn't reduce the worker quantity when step 1
> completes. Since step 2 doesn't speed up with more workers, it would be
> best if it could start as soon as step 1 starts. This way, the job
> completes faster and uses fewer resources.
>
> Jacob
>
> On Fri, Nov 17, 2017 at 12:23 PM, Jacob Marble <jmar...@kochava.com>
> wrote:
>
>> Here is a small pipeline job that fails using the Dataflow runner, but
>> doesn't fail using the direct runner.
>>
>> https://gist.github.com/jacobmarble/804c2edb9c80a2863f3e671d6851a55f
>>
>> Jacob
>>
>> On Fri, Nov 17, 2017 at 9:27 AM, Kenneth Knowles <k...@google.com> wrote:
>>
>>> It is definitely a big deal if @Setup is not getting called! There are
>>> no special cases that would skip @Setup. Please do report what you can.
>>>
>>> That said, lazily doing setup (via null check or some such as you
>>> mention) is perfectly fine and often a more robust programming pattern.
>>> Upside: you can't accidentally use uninitialized things. Downside: it might
>>> mask repeated initialization and only manifest as poor performance.
>>>
>>> Kenn
>>>
>>> On Fri, Nov 17, 2017 at 9:00 AM, Jacob Marble <jmar...@kochava.com>
>>> wrote:
>>>
>>>> I tried to write a simpler DoFn that induces the error, but it works
>>>> fine. Working around the issue today by using @StartBundle with a null
>>>> check, and that seems to be working.
>>>>
>>>> If this really is a big deal, then it needs to be reported, so I'll try
>>>> to find time to write a broken example.
>>>>
>>>> Jacob
>>>>
>>>> On Thu, Nov 16, 2017 at 10:27 PM, Eugene Kirpichov <
>>>> kirpic...@google.com> wrote:
>>>>
>>>>> Could you give more details, e.g. a code snippet that reproduces the
>>>>> issue, and describe how you determine that @Setup hasn't been called?
>>>>>
>>>>> On Thu, Nov 16, 2017 at 6:58 PM Derek Hao Hu <phoenixin...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> ​I've been using DoFn.Setup method in Dataflow and it seems to be
>>>>>> working fine.​
>>>>>>
>>>>>> On Thu, Nov 16, 2017 at 4:56 PM, Jacob Marble <jmar...@kochava.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This one is weird.
>>>>>>>
>>>>>>> A DoFn I wrote:
>>>>>>> - stateful
>>>>>>> - used plenty in a streaming pipeline
>>>>>>> - direct and dataflow runners
>>>>>>> - works fine
>>>>>>>
>>>>>>> Now:
>>>>>>> - new batch pipeline
>>>>>>> - @DoFn.Setup method not called
>>>>>>> - direct runner works properly (logs from setup method are output)
>>>>>>> - dataflow runner simply doesn't call the setup method
>>>>>>>
>>>>>>> Is this possibly a Beam misuse? Javadoc for DoFn.Setup doesn't hint
>>>>>>> at anything, so I'm suspecting Dataflow bug?
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Derek Hao Hu
>>>>>>
>>>>>> Software Engineer | Snapchat
>>>>>> Snap Inc.
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: @DoFn.Setup not called

2017-11-17 Thread Jacob Marble
Noticing some related and unexpected differences between batch and
streaming pipelines.

Why does a stateful DoFn behave like GroupByKey (no data output until all
data input is complete) in a batch pipeline, but not in a streaming
pipeline? It looks like BatchStatefulParDoOverrides has something to do
with it, but I can't figure out why, or how to work around it.

In this current project:
1) read 500 million elements
2) very slow stateful DoFn (rate limited API calls)
3) write results

To complete step 1 in a reasonable time, multiple workers are required, but
Dataflow's autoscaling doesn't reduce the worker quantity when step 1
completes. Since step 2 doesn't speed up with more workers, it would be
best if it could start as soon as step 1 starts. This way, the job
completes faster and uses fewer resources.

Jacob

On Fri, Nov 17, 2017 at 12:23 PM, Jacob Marble <jmar...@kochava.com> wrote:

> Here is a small pipeline job that fails using the Dataflow runner, but
> doesn't fail using the direct runner.
>
> https://gist.github.com/jacobmarble/804c2edb9c80a2863f3e671d6851a55f
>
> Jacob
>
> On Fri, Nov 17, 2017 at 9:27 AM, Kenneth Knowles <k...@google.com> wrote:
>
>> It is definitely a big deal if @Setup is not getting called! There are no
>> special cases that would skip @Setup. Please do report what you can.
>>
>> That said, lazily doing setup (via null check or some such as you
>> mention) is perfectly fine and often a more robust programming pattern.
>> Upside: you can't accidentally use uninitialized things. Downside: it might
>> mask repeated initialization and only manifest as poor performance.
>>
>> Kenn
>>
>> On Fri, Nov 17, 2017 at 9:00 AM, Jacob Marble <jmar...@kochava.com>
>> wrote:
>>
>>> I tried to write a simpler DoFn that induces the error, but it works
>>> fine. Working around the issue today by using @StartBundle with a null
>>> check, and that seems to be working.
>>>
>>> If this really is a big deal, then it needs to be reported, so I'll try
>>> to find time to write a broken example.
>>>
>>> Jacob
>>>
>>> On Thu, Nov 16, 2017 at 10:27 PM, Eugene Kirpichov <kirpic...@google.com
>>> > wrote:
>>>
>>>> Could you give more details, e.g. a code snippet that reproduces the
>>>> issue, and describe how you determine that @Setup hasn't been called?
>>>>
>>>> On Thu, Nov 16, 2017 at 6:58 PM Derek Hao Hu <phoenixin...@gmail.com>
>>>> wrote:
>>>>
>>>>> ​I've been using DoFn.Setup method in Dataflow and it seems to be
>>>>> working fine.​
>>>>>
>>>>> On Thu, Nov 16, 2017 at 4:56 PM, Jacob Marble <jmar...@kochava.com>
>>>>> wrote:
>>>>>
>>>>>> This one is weird.
>>>>>>
>>>>>> A DoFn I wrote:
>>>>>> - stateful
>>>>>> - used plenty in a streaming pipeline
>>>>>> - direct and dataflow runners
>>>>>> - works fine
>>>>>>
>>>>>> Now:
>>>>>> - new batch pipeline
>>>>>> - @DoFn.Setup method not called
>>>>>> - direct runner works properly (logs from setup method are output)
>>>>>> - dataflow runner simply doesn't call the setup method
>>>>>>
>>>>>> Is this possibly a Beam misuse? Javadoc for DoFn.Setup doesn't hint
>>>>>> at anything, so I'm suspecting Dataflow bug?
>>>>>>
>>>>>> Jacob
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Derek Hao Hu
>>>>>
>>>>> Software Engineer | Snapchat
>>>>> Snap Inc.
>>>>>
>>>>
>>>
>>
>


Re: @DoFn.Setup not called

2017-11-17 Thread Jacob Marble
Here is a small pipeline job that fails using the Dataflow runner, but
doesn't fail using the direct runner.

https://gist.github.com/jacobmarble/804c2edb9c80a2863f3e671d6851a55f

Jacob

On Fri, Nov 17, 2017 at 9:27 AM, Kenneth Knowles <k...@google.com> wrote:

> It is definitely a big deal if @Setup is not getting called! There are no
> special cases that would skip @Setup. Please do report what you can.
>
> That said, lazily doing setup (via null check or some such as you mention)
> is perfectly fine and often a more robust programming pattern. Upside: you
> can't accidentally use uninitialized things. Downside: it might mask
> repeated initialization and only manifest as poor performance.
>
> Kenn
>
> On Fri, Nov 17, 2017 at 9:00 AM, Jacob Marble <jmar...@kochava.com> wrote:
>
>> I tried to write a simpler DoFn that induces the error, but it works
>> fine. Working around the issue today by using @StartBundle with a null
>> check, and that seems to be working.
>>
>> If this really is a big deal, then it needs to be reported, so I'll try
>> to find time to write a broken example.
>>
>> Jacob
>>
>> On Thu, Nov 16, 2017 at 10:27 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> Could you give more details, e.g. a code snippet that reproduces the
>>> issue, and describe how you determine that @Setup hasn't been called?
>>>
>>> On Thu, Nov 16, 2017 at 6:58 PM Derek Hao Hu <phoenixin...@gmail.com>
>>> wrote:
>>>
>>>> ​I've been using DoFn.Setup method in Dataflow and it seems to be
>>>> working fine.​
>>>>
>>>> On Thu, Nov 16, 2017 at 4:56 PM, Jacob Marble <jmar...@kochava.com>
>>>> wrote:
>>>>
>>>>> This one is weird.
>>>>>
>>>>> A DoFn I wrote:
>>>>> - stateful
>>>>> - used plenty in a streaming pipeline
>>>>> - direct and dataflow runners
>>>>> - works fine
>>>>>
>>>>> Now:
>>>>> - new batch pipeline
>>>>> - @DoFn.Setup method not called
>>>>> - direct runner works properly (logs from setup method are output)
>>>>> - dataflow runner simply doesn't call the setup method
>>>>>
>>>>> Is this possibly a Beam misuse? Javadoc for DoFn.Setup doesn't hint at
>>>>> anything, so I'm suspecting Dataflow bug?
>>>>>
>>>>> Jacob
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Derek Hao Hu
>>>>
>>>> Software Engineer | Snapchat
>>>> Snap Inc.
>>>>
>>>
>>
>


Re: @DoFn.Setup not called

2017-11-17 Thread Jacob Marble
I tried to write a simpler DoFn that induces the error, but it works fine.
Working around the issue today by using @StartBundle with a null check, and
that seems to be working.

If this really is a big deal, then it needs to be reported, so I'll try to
find time to write a broken example.

Jacob

On Thu, Nov 16, 2017 at 10:27 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> Could you give more details, e.g. a code snippet that reproduces the
> issue, and describe how you determine that @Setup hasn't been called?
>
> On Thu, Nov 16, 2017 at 6:58 PM Derek Hao Hu <phoenixin...@gmail.com>
> wrote:
>
>> ​I've been using DoFn.Setup method in Dataflow and it seems to be working
>> fine.​
>>
>> On Thu, Nov 16, 2017 at 4:56 PM, Jacob Marble <jmar...@kochava.com>
>> wrote:
>>
>>> This one is weird.
>>>
>>> A DoFn I wrote:
>>> - stateful
>>> - used plenty in a streaming pipeline
>>> - direct and dataflow runners
>>> - works fine
>>>
>>> Now:
>>> - new batch pipeline
>>> - @DoFn.Setup method not called
>>> - direct runner works properly (logs from setup method are output)
>>> - dataflow runner simply doesn't call the setup method
>>>
>>> Is this possibly a Beam misuse? Javadoc for DoFn.Setup doesn't hint at
>>> anything, so I'm suspecting Dataflow bug?
>>>
>>> Jacob
>>>
>>
>>
>>
>> --
>> Derek Hao Hu
>>
>> Software Engineer | Snapchat
>> Snap Inc.
>>
>


Re: Slack Channel

2017-11-16 Thread Jacob Marble
Me too, if you don't mind.

Jacob

On Thu, Nov 9, 2017 at 2:09 PM, Lukasz Cwik  wrote:

> Invite sent, welcome.
>
> On Thu, Nov 9, 2017 at 2:08 PM, Fred Tsang  wrote:
>
>> Hi,
>>
>> Please add me to the slack channel.
>>
>> Thanks,
>> Fred
>>
>> Ps. I think "BeamTV" would be a great YouTube channel ;)
>>
>
>


Re: [VOTE] Choose the "new" Spark runner

2017-11-16 Thread Jacob Marble
[ ] Use Spark 1 & Spark 2 Support Branch
[X] Use Spark 2 Only Branch

Spark 2 has been out for a while, so probably not going to offend many
people.

Jacob

On Thu, Nov 16, 2017 at 5:45 AM, Neville Dipale 
wrote:

> [ ] Use Spark 1 & Spark 2 Support Branch
> [X] Use Spark 2 Only Branch
>
> On 16 November 2017 at 15:08, Jean-Baptiste Onofré 
> wrote:
>
>> Hi guys,
>>
>> To illustrate the current discussion about Spark versions support, you
>> can take a look on:
>>
>> --
>> Spark 1 & Spark 2 Support Branch
>>
>> https://github.com/jbonofre/beam/tree/BEAM-1920-SPARK2-MODULES
>>
>> This branch contains a Spark runner common module compatible with both
>> Spark 1.x and 2.x. For convenience, we introduced spark1 & spark2
>> modules/artifacts containing just a pom.xml to define the dependencies set.
>>
>> --
>> Spark 2 Only Branch
>>
>> https://github.com/jbonofre/beam/tree/BEAM-1920-SPARK2-ONLY
>>
>> This branch is an upgrade to Spark 2.x and "drop" support of Spark 1.x.
>>
>> As I'm ready to merge one of the other in the PR, I would like to
>> complete the vote/discussion pretty soon.
>>
>> Correct me if I'm wrong, but it seems that the preference is to drop
>> Spark 1.x to focus only on Spark 2.x (for the Spark 2 Only Branch).
>>
>> I would like to call a final vote to act the merge I will do:
>>
>> [ ] Use Spark 1 & Spark 2 Support Branch
>> [ ] Use Spark 2 Only Branch
>>
>> This informal vote is open for 48 hours.
>>
>> Please, let me know what your preference is.
>>
>> Thanks !
>> Regards
>> JB
>>
>> On 11/13/2017 09:32 AM, Jean-Baptiste Onofré wrote:
>>
>>> Hi Beamers,
>>>
>>> I'm forwarding this discussion & vote from the dev mailing list to the
>>> user mailing list.
>>> The goal is to have your feedback as user.
>>>
>>> Basically, we have two options:
>>> 1. Right now, in the PR, we support both Spark 1.x and 2.x using three
>>> artifacts (common, spark1, spark2). You, as users, pick up spark1 or spark2
>>> in your dependencies set depending the Spark target version you want.
>>> 2. The other option is to upgrade and focus on Spark 2.x in Beam 2.3.0.
>>> If you still want to use Spark 1.x, then, you will be stuck up to Beam
>>> 2.2.0.
>>>
>>> Thoughts ?
>>>
>>> Thanks !
>>> Regards
>>> JB
>>>
>>>
>>>  Forwarded Message 
>>> Subject: [VOTE] Drop Spark 1.x support to focus on Spark 2.x
>>> Date: Wed, 8 Nov 2017 08:27:58 +0100
>>> From: Jean-Baptiste Onofré 
>>> Reply-To: d...@beam.apache.org
>>> To: d...@beam.apache.org
>>>
>>> Hi all,
>>>
>>> as you might know, we are working on Spark 2.x support in the Spark
>>> runner.
>>>
>>> I'm working on a PR about that:
>>>
>>> https://github.com/apache/beam/pull/3808
>>>
>>> Today, we have something working with both Spark 1.x and 2.x from a code
>>> standpoint, but I have to deal with dependencies. It's the first step of
>>> the update as I'm still using RDD, the second step would be to support
>>> dataframe (but for that, I would need PCollection elements with schemas,
>>> that's another topic on which Eugene, Reuven and I are discussing).
>>>
>>> However, as all major distributions now ship Spark 2.x, I don't think
>>> it's required anymore to support Spark 1.x.
>>>
>>> If we agree, I will update and cleanup the PR to only support and focus
>>> on Spark 2.x.
>>>
>>> So, that's why I'm calling for a vote:
>>>
>>>[ ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only
>>>[ ] 0 (I don't care ;))
>>>[ ] -1, I would like to still support Spark 1.x, and so having
>>> support of both Spark 1.x and 2.x (please provide specific comment)
>>>
>>> This vote is open for 48 hours (I have the commits ready, just waiting
>>> the end of the vote to push on the PR).
>>>
>>> Thanks !
>>> Regards
>>> JB
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>


@DoFn.Setup not called

2017-11-16 Thread Jacob Marble
This one is weird.

A DoFn I wrote:
- stateful
- used plenty in a streaming pipeline
- direct and dataflow runners
- works fine

Now:
- new batch pipeline
- @DoFn.Setup method not called
- direct runner works properly (logs from setup method are output)
- dataflow runner simply doesn't call the setup method

Is this possibly a Beam misuse? Javadoc for DoFn.Setup doesn't hint at
anything, so I'm suspecting Dataflow bug?

Jacob


Re: Windowing in a batch pipeline

2017-11-09 Thread Jacob Marble
Thanks Robert, here's what I did with your advice.

After the early Join/GBK transformations:

collection
 .apply(WithTimestamps.of(...).withAllowedTimestampSkew(new
Duration(Long.MAX_VALUE/10)))
 
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(10)))
   .withAllowedLateness(new
Duration(Long.MAX_VALUE/10)).discardingFiredPanes())
 .apply(WithKeys.of(...))
 .apply(Combine.perKey(...))
 .apply(TextIO.write()...);

Long.MAX_VALUE/10 to prevent an overflow error; Long.MAX_VALUE/2 also works.

Jacob

On Wed, Nov 8, 2017 at 5:54 PM, Robert Bradshaw <rober...@google.com> wrote:

> On Wed, Nov 8, 2017 at 5:33 PM, Jacob Marble <jmar...@kochava.com> wrote:
> > Good evening. I'm trying to nail down windowing. The concept is clear,
> just
> > struggling with writing a working pipeline. Tonight the goal is group
> events
> > by key and window, in a batch pipeline. All data is "late" because it's a
> > batch pipeline, and I expect nothing to be dropped or processed in a
> "late"
> > context.
>
> Traditionally, in a batch pipeline we consider no data to be late, as
> we have perfect knowledge of the watermark.
>
> > Read section 7 and 8 of the Beam Programming Guide roughly twice.
> > Sifted through the examples, WindowedWordCount is close, but it doesn't
> use
> > triggering, which is where (2b) is probably off track.
> >
> > 1)
> > PCollection is created through a series of transforms, including a
> > Join.leftOuterJoin(). Apply a timestamp with something simple:
> >
> > collection.apply("add window timestamp",
> >  ParDo.of(new DoFn<Foo, Foo>() {
> >   @ProcessElement
> >   public void map(ProcessContext context) {
> >Foo element = context.element();
> >Instant timestamp = new Instant(element.getActivityUnixSeconds() *
> 1000);
> >context.outputWithTimestamp(element, timestamp);
> >   }
> >  }));
> >
> > This fails with "java.lang.IllegalArgumentException: Cannot output with
> > timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
> > than the timestamp of the current input (294247-01-09T04:00:54.775Z)
> minus
> > the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> > Javadoc for details on changing the allowed skew."
> >
> > Is this expected? I don't care about skew, just want to set the timestamp
> > per element.
> >
> > I worked around this by applying the timestamp earlier in the pipeline,
> > right after a TextIO.read(). Why does that fix the problem?
>
> I would suspect that very-far-in-the-future timestamp is the end of
> the global window, set as the timestamp as the result of a
> group-by-key.
>
> You can set your timestamps earlier, as you have done, but in this
> case they will get reset after passing through any GBK. It's possible
> you could get what you want by setting TimestampCombiner to EARLIEST
> (see https://github.com/apache/beam/blob/v2.1.1/sdks/java/
> core/src/main/java/org/apache/beam/sdk/transforms/windowing/
> TimestampCombiner.java#L47)
> but probably the right solution is to set the allowed timestamp skew
> to infinity (or Long.MAX_VALUE or similar).
>
> Generally this skew is needed in streaming to hold the watermark back
> the right amount... Definitely not intuitive in your case; we should
> think if there's something better we could do here.
>
> > 2a)
> > After applying the timestamp, let's window!
> >
> > collection.apply("window into sessions",
> >  Window.into(Sessions.withGapDuration(Duration.
> standardMinutes(10
> >  .apply("key by something, reduce")
> >  .apply(TextIO.write()...)
> >
> > Now I see an output file, what joy! But the output file is empty.
> Confirmed
> > that the PCollection feeding TextIO.write() is seeing data. Maybe this is
> > because the default trigger is incorrect for my use case? I expected not
> to
> > need triggering in batch context, but the DefaultTrigger Javadoc makes me
> > believe otherwise.
> >
> > 2b)
> > How about the Never.ever() trigger? Javadoc: "Using this trigger will
> only
> > produce output when the watermark passes the end of the {@link
> BoundedWindow
> > window}". I don't know, but let's try. There's some error about allowed
> > lateness and firing panes, so I'll try values that look standard:
> >
> > collection.apply("window into sessions",
> >  Window.into(Sessions.withGapDuration(Duration.
> standardMinutes(10)))
> >
> > .triggering(Never.ever()).withAllowedLateness(Duration.standardDays(1)).
> discardingFiredPanes())
> >  .apply("key by something, reduce")
> >  .apply(TextIO.write()...)
> >
> > This yields a new error:
> > "java.lang.IllegalStateException: TimestampCombiner moved element from
> > 294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z
> (end
> > of global window) for window
> > org.apache.beam.sdk.transforms.windowing.GlobalWindow"
> >
> > So I'm probably looking in the wrong place.
>
> I think if you resolve the issues above than this will take care of itself.
>
> - Robert
>


Windowing in a batch pipeline

2017-11-08 Thread Jacob Marble
Good evening. I'm trying to nail down windowing. The concept is clear, just
struggling with writing a working pipeline. Tonight the goal is group
events by key and window, in a batch pipeline. All data is "late" because
it's a batch pipeline, and I expect nothing to be dropped or processed in a
"late" context.

Read section 7 and 8 of the Beam Programming Guide roughly twice.
Sifted through the examples, WindowedWordCount is close, but it doesn't use
triggering, which is where (2b) is probably off track.

1)
PCollection is created through a series of transforms, including a
Join.leftOuterJoin(). Apply a timestamp with something simple:

collection.apply("add window timestamp",
 ParDo.of(new DoFn() {
  @ProcessElement
  public void map(ProcessContext context) {
   Foo element = context.element();
   Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
   context.outputWithTimestamp(element, timestamp);
  }
 }));

This fails with "java.lang.IllegalArgumentException: Cannot output with
timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
Javadoc for details on changing the allowed skew."

*Is this expected? I don't care about skew, just want to set the timestamp
per element.*

I worked around this by applying the timestamp earlier in the pipeline,
right after a TextIO.read(). Why does that fix the problem?

2a)
After applying the timestamp, let's window!

collection.apply("window into sessions",
 Window.into(Sessions.withGapDuration(Duration.standardMinutes(10
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

Now I see an output file, what joy! *But the output file is empty.* Confirmed
that the PCollection feeding TextIO.write() is seeing data. Maybe this is
because the default trigger is incorrect for my use case? I expected not to
need triggering in batch context, but the DefaultTrigger Javadoc makes me
believe otherwise.

2b)
How about the Never.ever() trigger? Javadoc: "Using this trigger will only
produce output when the watermark passes the end of the {@link
BoundedWindow window}". I don't know, but let's try. There's some error
about allowed lateness and firing panes, so I'll try values that look
standard:

collection.apply("window into sessions",
 Window.into(Sessions.withGapDuration(Duration.standardMinutes(10)))
  .triggering(Never.ever()).withAllowedLateness(Duration.stand
ardDays(1)).discardingFiredPanes())
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

This yields a new error:
"java.lang.IllegalStateException: TimestampCombiner moved element from
294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
of global window) for window org.apache.beam.sdk.transforms
.windowing.GlobalWindow"

So I'm probably looking in the wrong place.

Thanks!

Jacob


DoFn.OnTimer thread safe?

2017-10-30 Thread Jacob Marble
In a DoFn instance, is an OnTimer method always called when no
ProcessElement method is active?

Jacob


Re: "processing lull"

2017-10-29 Thread Jacob Marble
- Upstream steps seem to be slowed down by this PTransform (system lag up
and elements/sec down)
- Unbounded source is PubSubIO

Jacob

On Sun, Oct 29, 2017 at 9:22 PM, Jacob Marble <jmar...@kochava.com> wrote:

> - This does not happen when I don't use the reshuffle hack
> - HTTP QPS seems to be improved with reshuffling, but also seems to
> burst-and-pause
> - The "processing lull" log entry occurs 4 times every 5 minutes
> - Right now, I'm guessing that "processing lull" means that a map
> operation is taking too long
>
> Jacob
>
> On Sun, Oct 29, 2017 at 9:15 PM, Jacob Marble <jmar...@kochava.com> wrote:
>
>> Good evening-
>>
>> What should I make of the log warning "processing lull for [instant] in
>> state windmill-read" ?
>>
>> - This happens in a streaming pipeline, in Dataflow.
>> - The DoFn that emits the log entry makes HTTP requests to a third-party.
>> - This only happens when I added a side input to the PTransform, to
>> prevent fusing.
>> - The side input is a SingletonView, just an empty string value.
>>
>> Thanks as usual,
>>
>> Jacob
>>
>> Processing lull for PT300.124S in state windmill-read of [step name]
>>   at sun.misc.Unsafe.park(Native Method)
>>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>   at com.google.cloud.dataflow.worker.repackaged.com.google.commo
>> n.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
>>   at com.google.cloud.dataflow.worker.repackaged.com.google.commo
>> n.util.concurrent.AbstractFuture$TrustedFuture.get(
>> AbstractFuture.java:76)
>>   at com.google.cloud.dataflow.worker.MetricTrackingWindmillServe
>> rStub.getStateData(MetricTrackingWindmillServerStub.java:188)
>>   at com.google.cloud.dataflow.worker.WindmillStateReader.startBa
>> tchAndBlock(WindmillStateReader.java:405)
>>   at com.google.cloud.dataflow.worker.WindmillStateReader$Wrapped
>> Future.get(WindmillStateReader.java:306)
>>   at com.google.cloud.dataflow.worker.WindmillStateInternals$Wind
>> millValue.read(WindmillStateInternals.java:384)
>>   at com.google.cloud.dataflow.worker.StreamingSideInputFetcher.b
>> lockedMap(StreamingSideInputFetcher.java:249)
>>   at com.google.cloud.dataflow.worker.StreamingSideInputFetcher.s
>> toreIfBlocked(StreamingSideInputFetcher.java:186)
>>   at com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunne
>> r.processElement(StreamingSideInputDoFnRunner.java:70)
>>   at com.google.cloud.dataflow.worker.SimpleParDoFn.processElemen
>> t(SimpleParDoFn.java:233)
>>   at com.google.cloud.dataflow.worker.util.common.worker.ParDoOpe
>> ration.process(ParDoOperation.java:48)
>>   at com.google.cloud.dataflow.worker.util.common.worker.OutputRe
>> ceiver.process(OutputReceiver.java:52)
>>   at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(Simp
>> leParDoFn.java:183)
>>
>
>


Re: "processing lull"

2017-10-29 Thread Jacob Marble
- This does not happen when I don't use the reshuffle hack
- HTTP QPS seems to be improved with reshuffling, but also seems to
burst-and-pause
- The "processing lull" log entry occurs 4 times every 5 minutes
- Right now, I'm guessing that "processing lull" means that a map operation
is taking too long

Jacob

On Sun, Oct 29, 2017 at 9:15 PM, Jacob Marble <jmar...@kochava.com> wrote:

> Good evening-
>
> What should I make of the log warning "processing lull for [instant] in
> state windmill-read" ?
>
> - This happens in a streaming pipeline, in Dataflow.
> - The DoFn that emits the log entry makes HTTP requests to a third-party.
> - This only happens when I added a side input to the PTransform, to
> prevent fusing.
> - The side input is a SingletonView, just an empty string value.
>
> Thanks as usual,
>
> Jacob
>
> Processing lull for PT300.124S in state windmill-read of [step name]
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at com.google.cloud.dataflow.worker.repackaged.com.google.
> common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
>   at com.google.cloud.dataflow.worker.repackaged.com.google.
> common.util.concurrent.AbstractFuture$TrustedFuture.
> get(AbstractFuture.java:76)
>   at com.google.cloud.dataflow.worker.MetricTrackingWindmillServerSt
> ub.getStateData(MetricTrackingWindmillServerStub.java:188)
>   at com.google.cloud.dataflow.worker.WindmillStateReader.
> startBatchAndBlock(WindmillStateReader.java:405)
>   at com.google.cloud.dataflow.worker.WindmillStateReader$
> WrappedFuture.get(WindmillStateReader.java:306)
>   at com.google.cloud.dataflow.worker.WindmillStateInternals$
> WindmillValue.read(WindmillStateInternals.java:384)
>   at com.google.cloud.dataflow.worker.StreamingSideInputFetcher.
> blockedMap(StreamingSideInputFetcher.java:249)
>   at com.google.cloud.dataflow.worker.StreamingSideInputFetcher.
> storeIfBlocked(StreamingSideInputFetcher.java:186)
>   at com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.
> processElement(StreamingSideInputDoFnRunner.java:70)
>   at com.google.cloud.dataflow.worker.SimpleParDoFn.
> processElement(SimpleParDoFn.java:233)
>   at com.google.cloud.dataflow.worker.util.common.worker.
> ParDoOperation.process(ParDoOperation.java:48)
>   at com.google.cloud.dataflow.worker.util.common.worker.
> OutputReceiver.process(OutputReceiver.java:52)
>   at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(
> SimpleParDoFn.java:183)
>


"processing lull"

2017-10-29 Thread Jacob Marble
Good evening-

What should I make of the log warning "processing lull for [instant] in
state windmill-read" ?

- This happens in a streaming pipeline, in Dataflow.
- The DoFn that emits the log entry makes HTTP requests to a third-party.
- This only happens when I added a side input to the PTransform, to prevent
fusing.
- The side input is a SingletonView, just an empty string value.

Thanks as usual,

Jacob

Processing lull for PT300.124S in state windmill-read of [step name]
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at
com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
  at
com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
  at
com.google.cloud.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:188)
  at
com.google.cloud.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:405)
  at
com.google.cloud.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:306)
  at
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillValue.read(WindmillStateInternals.java:384)
  at
com.google.cloud.dataflow.worker.StreamingSideInputFetcher.blockedMap(StreamingSideInputFetcher.java:249)
  at
com.google.cloud.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:186)
  at
com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:70)
  at
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233)
  at
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
  at
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
  at
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183)


Re: How to window by quantity of data?

2017-10-20 Thread Jacob Marble
Final, working version is in the original gist:
https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c

The result is heavily inspired by GroupIntoBatches, and doesn't use
windowing.

Jacob

On Wed, Oct 18, 2017 at 2:49 PM, Jacob Marble <jmar...@kochava.com> wrote:

> Thomas, I reworked using the GroupIntoBatches PTransform, and things
> working great (with fewer lines of code).
>
> Thanks
>
> Jacob
>
> On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble <jmar...@kochava.com> wrote:
>
>> That gist isn't working right now, but I'll update it when I find the bug.
>>
>> The direct runner grows memory, but never writes files.
>> The dataflow runner writes temp files, but FinalizeGroupByKey never moves
>> them to the final destination.
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <jmar...@kochava.com>
>> wrote:
>>
>>> Consider multiple instances of a DoFn:
>>>
>>> @ProcessElement
>>> public void window(ProcessContext context,
>>> @StateId("count") ValueState countState) {
>>>
>>> int count = MoreObjects.firstNonNull(countState.read(), 0);
>>> count += 1;
>>> countState.write(count);
>>>
>>> If two instances read countState, then write countState, will countState
>>> not be incremented by 1, but not by 2?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> What do you mean by non-atomic?
>>>>
>>>> All output/state changes/timers from a process bundle are an all or
>>>> nothing change. So if processing a bundle fails, any state changes are
>>>> discarded and the state is reset to what it was before the bundle was
>>>> processed.
>>>>
>>>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com>
>>>> wrote:
>>>>
>>>>> Here's a gist: https://gist.github.com/jacobm
>>>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>>>
>>>>> Should I consider StateId value mutations to be non-atomic?
>>>>>
>>>>> Jacob
>>>>>
>>>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Feel free to share it with an online paste or a link to a github repo
>>>>>> containing the code.
>>>>>>
>>>>>> Other users may be interested in your solution.
>>>>>>
>>>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Lukasz-
>>>>>>>
>>>>>>> That worked. I created a stateful DoFn with a stale timer, an
>>>>>>> initial timestamp state, and a counter state, along with a buffer of
>>>>>>> elements to bundle. When the counter or timer exceeds max values,
>>>>>>> outputWithTimestamp().
>>>>>>>
>>>>>>> I'm happy to post the entire implementation somewhere, not sure
>>>>>>> about etiquette and how this mailing list handles attachments.
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>>>>> extended for your usecase.
>>>>>>>>
>>>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>>>>
>>>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> My first streaming pipeline is pretty simple, it just pipes a
>>>>>>>>> queue into files:
>>>>>>>>>
>>>>>>>>> - read JSON objects from PubsubIO
>>>>>>>>> - event time = processing time
>>>>>>>>> - 5 minute windows (
>>>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>>>>
>>>>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>>>>> amount
>>>>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>>>>
>>>>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>>>>> GCS":
>>>>>>>>>
>>>>>>>>> - split/partition/window into ceil(totalElements / maxElements)
>>>>>>>>> groups
>>>>>>>>>
>>>>>>>>> My next idea is to implement my own Partition and PartitionDoFn
>>>>>>>>> that accept a PCollectionView from Count.perElemen().
>>>>>>>>>
>>>>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>>>>> element quantity?
>>>>>>>>>
>>>>>>>>> Jacob
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Is anyone using Beam for geo use cases?

2017-10-20 Thread Jacob Marble
I wrote an OSM PBF parser for Beam. It took ~8h to parse the planet on one
core, but that could be improved with a reshuffle after the actual read
transform.

I also wrote a simple key class. Useful for bucketing geo data into smaller
chunks.
- constructor takes desired grid size in kilometers
- get key from lat/lon

Jacob

On Fri, Oct 20, 2017 at 4:16 AM, Vilhelm von Ehrenheim <
vonehrenh...@gmail.com> wrote:

> I am interested! I need to do this soon but havent started yet. Would love
> to see an example of how you do it. :)
>
> Br,
> Vilhelm von Ehrenheim
>
> On 20 Oct 2017 12:40, "Csaba Kassai" <csaba.kas...@doctusoft.com> wrote:
>
>> Hi Jacob,
>>
>> we are doing the opposite direction: we enrich data with geo coordinates
>> from textual address using Google Maps API with Cloud Dataflow.
>> Are you interested in this use-case?
>>
>> Csabi
>>
>> On Thu, 19 Oct 2017 at 21:00 Jacob Marble <jmar...@kochava.com> wrote:
>>
>>> Is anyone using Beam to solve geo problems?
>>>
>>> For example, a simple "reverse geo" function:
>>> f(lat, lon) => country, state/province/etc, city, postal code
>>>
>>> Jacob
>>>
>>


Is anyone using Beam for geo use cases?

2017-10-19 Thread Jacob Marble
Is anyone using Beam to solve geo problems?

For example, a simple "reverse geo" function:
f(lat, lon) => country, state/province/etc, city, postal code

Jacob


Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
Thomas, I reworked using the GroupIntoBatches PTransform, and things
working great (with fewer lines of code).

Thanks

Jacob

On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble <jmar...@kochava.com> wrote:

> That gist isn't working right now, but I'll update it when I find the bug.
>
> The direct runner grows memory, but never writes files.
> The dataflow runner writes temp files, but FinalizeGroupByKey never moves
> them to the final destination.
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <jmar...@kochava.com>
> wrote:
>
>> Consider multiple instances of a DoFn:
>>
>> @ProcessElement
>> public void window(ProcessContext context,
>> @StateId("count") ValueState countState) {
>>
>> int count = MoreObjects.firstNonNull(countState.read(), 0);
>> count += 1;
>> countState.write(count);
>>
>> If two instances read countState, then write countState, will countState
>> not be incremented by 1, but not by 2?
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> What do you mean by non-atomic?
>>>
>>> All output/state changes/timers from a process bundle are an all or
>>> nothing change. So if processing a bundle fails, any state changes are
>>> discarded and the state is reset to what it was before the bundle was
>>> processed.
>>>
>>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com>
>>> wrote:
>>>
>>>> Here's a gist: https://gist.github.com/jacobm
>>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>>
>>>> Should I consider StateId value mutations to be non-atomic?
>>>>
>>>> Jacob
>>>>
>>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Feel free to share it with an online paste or a link to a github repo
>>>>> containing the code.
>>>>>
>>>>> Other users may be interested in your solution.
>>>>>
>>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com>
>>>>> wrote:
>>>>>
>>>>>> Lukasz-
>>>>>>
>>>>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>>>>> timestamp state, and a counter state, along with a buffer of elements to
>>>>>> bundle. When the counter or timer exceeds max values, 
>>>>>> outputWithTimestamp().
>>>>>>
>>>>>> I'm happy to post the entire implementation somewhere, not sure about
>>>>>> etiquette and how this mailing list handles attachments.
>>>>>>
>>>>>> Jacob
>>>>>>
>>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>>>> extended for your usecase.
>>>>>>>
>>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>>>
>>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>>>>>> into files:
>>>>>>>>
>>>>>>>> - read JSON objects from PubsubIO
>>>>>>>> - event time = processing time
>>>>>>>> - 5 minute windows (
>>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>>>
>>>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>>>> amount
>>>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>>>
>>>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>>>> GCS":
>>>>>>>>
>>>>>>>> - split/partition/window into ceil(totalElements / maxElements)
>>>>>>>> groups
>>>>>>>>
>>>>>>>> My next idea is to implement my own Partition and PartitionDoFn
>>>>>>>> that accept a PCollectionView from Count.perElemen().
>>>>>>>>
>>>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>>>> element quantity?
>>>>>>>>
>>>>>>>> Jacob
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
That gist isn't working right now, but I'll update it when I find the bug.

The direct runner grows memory, but never writes files.
The dataflow runner writes temp files, but FinalizeGroupByKey never moves
them to the final destination.

Jacob

On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble <jmar...@kochava.com> wrote:

> Consider multiple instances of a DoFn:
>
> @ProcessElement
> public void window(ProcessContext context,
> @StateId("count") ValueState countState) {
>
> int count = MoreObjects.firstNonNull(countState.read(), 0);
> count += 1;
> countState.write(count);
>
> If two instances read countState, then write countState, will countState
> not be incremented by 1, but not by 2?
>
> Jacob
>
> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> What do you mean by non-atomic?
>>
>> All output/state changes/timers from a process bundle are an all or
>> nothing change. So if processing a bundle fails, any state changes are
>> discarded and the state is reset to what it was before the bundle was
>> processed.
>>
>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com>
>> wrote:
>>
>>> Here's a gist: https://gist.github.com/jacobm
>>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>>
>>> Should I consider StateId value mutations to be non-atomic?
>>>
>>> Jacob
>>>
>>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Feel free to share it with an online paste or a link to a github repo
>>>> containing the code.
>>>>
>>>> Other users may be interested in your solution.
>>>>
>>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com>
>>>> wrote:
>>>>
>>>>> Lukasz-
>>>>>
>>>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>>>> timestamp state, and a counter state, along with a buffer of elements to
>>>>> bundle. When the counter or timer exceeds max values, 
>>>>> outputWithTimestamp().
>>>>>
>>>>> I'm happy to post the entire implementation somewhere, not sure about
>>>>> etiquette and how this mailing list handles attachments.
>>>>>
>>>>> Jacob
>>>>>
>>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>>> extended for your usecase.
>>>>>>
>>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>>
>>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com>
>>>>>> wrote:
>>>>>>
>>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>>>>> into files:
>>>>>>>
>>>>>>> - read JSON objects from PubsubIO
>>>>>>> - event time = processing time
>>>>>>> - 5 minute windows (
>>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>>
>>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>>> amount
>>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>>
>>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>>> GCS":
>>>>>>>
>>>>>>> - split/partition/window into ceil(totalElements / maxElements)
>>>>>>> groups
>>>>>>>
>>>>>>> My next idea is to implement my own Partition and PartitionDoFn that
>>>>>>> accept a PCollectionView from Count.perElemen().
>>>>>>>
>>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>>> element quantity?
>>>>>>>
>>>>>>> Jacob
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to window by quantity of data?

2017-10-18 Thread Jacob Marble
Consider multiple instances of a DoFn:

@ProcessElement
public void window(ProcessContext context,
@StateId("count") ValueState countState) {

int count = MoreObjects.firstNonNull(countState.read(), 0);
count += 1;
countState.write(count);

If two instances read countState, then write countState, will countState
not be incremented by 1, but not by 2?

Jacob

On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik <lc...@google.com> wrote:

> What do you mean by non-atomic?
>
> All output/state changes/timers from a process bundle are an all or
> nothing change. So if processing a bundle fails, any state changes are
> discarded and the state is reset to what it was before the bundle was
> processed.
>
> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble <jmar...@kochava.com>
> wrote:
>
>> Here's a gist: https://gist.github.com/jacobm
>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c
>>
>> Should I consider StateId value mutations to be non-atomic?
>>
>> Jacob
>>
>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Feel free to share it with an online paste or a link to a github repo
>>> containing the code.
>>>
>>> Other users may be interested in your solution.
>>>
>>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble <jmar...@kochava.com>
>>> wrote:
>>>
>>>> Lukasz-
>>>>
>>>> That worked. I created a stateful DoFn with a stale timer, an initial
>>>> timestamp state, and a counter state, along with a buffer of elements to
>>>> bundle. When the counter or timer exceeds max values, 
>>>> outputWithTimestamp().
>>>>
>>>> I'm happy to post the entire implementation somewhere, not sure about
>>>> etiquette and how this mailing list handles attachments.
>>>>
>>>> Jacob
>>>>
>>>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Have you considered using a stateful DoFn, buffering/batching based
>>>>> upon a certain number of elements is shown in this blog[1] and could be
>>>>> extended for your usecase.
>>>>>
>>>>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>>
>>>>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com>
>>>>> wrote:
>>>>>
>>>>>> My first streaming pipeline is pretty simple, it just pipes a queue
>>>>>> into files:
>>>>>>
>>>>>> - read JSON objects from PubsubIO
>>>>>> - event time = processing time
>>>>>> - 5 minute windows (
>>>>>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>>>>>
>>>>>> When the pipeline gets behind (for example, when the pipeline is
>>>>>> stopped for an hour and restarted) this creates problems because the 
>>>>>> amount
>>>>>> of data per file becomes too much, and the pipeline stays behind.
>>>>>>
>>>>>> I believe that what is needed is a new step, just before "write to
>>>>>> GCS":
>>>>>>
>>>>>> - split/partition/window into ceil(totalElements / maxElements) groups
>>>>>>
>>>>>> My next idea is to implement my own Partition and PartitionDoFn that
>>>>>> accept a PCollectionView from Count.perElemen().
>>>>>>
>>>>>> Is there a more built-in way to accomplish dynamic partitions by
>>>>>> element quantity?
>>>>>>
>>>>>> Jacob
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to window by quantity of data?

2017-10-17 Thread Jacob Marble
Lukasz-

That worked. I created a stateful DoFn with a stale timer, an initial
timestamp state, and a counter state, along with a buffer of elements to
bundle. When the counter or timer exceeds max values, outputWithTimestamp().

I'm happy to post the entire implementation somewhere, not sure about
etiquette and how this mailing list handles attachments.

Jacob

On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik <lc...@google.com> wrote:

> Have you considered using a stateful DoFn, buffering/batching based upon a
> certain number of elements is shown in this blog[1] and could be extended
> for your usecase.
>
> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble <jmar...@kochava.com> wrote:
>
>> My first streaming pipeline is pretty simple, it just pipes a queue into
>> files:
>>
>> - read JSON objects from PubsubIO
>> - event time = processing time
>> - 5 minute windows (
>> - write n files to GCS, (TextIO.withNumShards() not dynamic)
>>
>> When the pipeline gets behind (for example, when the pipeline is stopped
>> for an hour and restarted) this creates problems because the amount of data
>> per file becomes too much, and the pipeline stays behind.
>>
>> I believe that what is needed is a new step, just before "write to GCS":
>>
>> - split/partition/window into ceil(totalElements / maxElements) groups
>>
>> My next idea is to implement my own Partition and PartitionDoFn that
>> accept a PCollectionView from Count.perElemen().
>>
>> Is there a more built-in way to accomplish dynamic partitions by element
>> quantity?
>>
>> Jacob
>>
>
>


How to window by quantity of data?

2017-10-17 Thread Jacob Marble
My first streaming pipeline is pretty simple, it just pipes a queue into
files:

- read JSON objects from PubsubIO
- event time = processing time
- 5 minute windows (
- write n files to GCS, (TextIO.withNumShards() not dynamic)

When the pipeline gets behind (for example, when the pipeline is stopped
for an hour and restarted) this creates problems because the amount of data
per file becomes too much, and the pipeline stays behind.

I believe that what is needed is a new step, just before "write to GCS":

- split/partition/window into ceil(totalElements / maxElements) groups

My next idea is to implement my own Partition and PartitionDoFn that accept
a PCollectionView from Count.perElemen().

Is there a more built-in way to accomplish dynamic partitions by element
quantity?

Jacob


DoFn setup/teardown sequence

2017-10-15 Thread Jacob Marble
(there might be documentation on this that I didn't find; if so a link is
sufficient)

Good evening, this is just a check on my understanding. It looks like an
instance of a given DoFn goes through this lifecycle. Am I correct?

- constructor
- @Setup (once)
  - @StartBundle (zero to many times)
- @ProcessContext (zero to many times)
  - @FinishBundle
- @Teardown (once)

Can any of these steps be called concurrently? (I believe no)
Can one worker execute multiple instances of a DoFn? (I believe yes)

Thank you,

Jacob