Re: How to window by quantity of data?
Final, working version is in the original gist: https://gist.github.com/jacobmarble/6ca40e0a14828e6a0dfe89b9cb2e4b4c The result is heavily inspired by GroupIntoBatches, and doesn't use windowing. Jacob On Wed, Oct 18, 2017 at 2:49 PM, Jacob Marblewrote: > Thomas, I reworked using the GroupIntoBatches PTransform, and things > working great (with fewer lines of code). > > Thanks > > Jacob > > On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marble wrote: > >> That gist isn't working right now, but I'll update it when I find the bug. >> >> The direct runner grows memory, but never writes files. >> The dataflow runner writes temp files, but FinalizeGroupByKey never moves >> them to the final destination. >> >> Jacob >> >> On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble >> wrote: >> >>> Consider multiple instances of a DoFn: >>> >>> @ProcessElement >>> public void window(ProcessContext context, >>> @StateId("count") ValueState countState) { >>> >>> int count = MoreObjects.firstNonNull(countState.read(), 0); >>> count += 1; >>> countState.write(count); >>> >>> If two instances read countState, then write countState, will countState >>> not be incremented by 1, but not by 2? >>> >>> Jacob >>> >>> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik wrote: >>> What do you mean by non-atomic? All output/state changes/timers from a process bundle are an all or nothing change. So if processing a bundle fails, any state changes are discarded and the state is reset to what it was before the bundle was processed. On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble wrote: > Here's a gist: https://gist.github.com/jacobm > arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c > > Should I consider StateId value mutations to be non-atomic? > > Jacob > > On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik wrote: > >> Feel free to share it with an online paste or a link to a github repo >> containing the code. >> >> Other users may be interested in your solution. >> >> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble >> wrote: >> >>> Lukasz- >>> >>> That worked. I created a stateful DoFn with a stale timer, an >>> initial timestamp state, and a counter state, along with a buffer of >>> elements to bundle. When the counter or timer exceeds max values, >>> outputWithTimestamp(). >>> >>> I'm happy to post the entire implementation somewhere, not sure >>> about etiquette and how this mailing list handles attachments. >>> >>> Jacob >>> >>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik >>> wrote: >>> Have you considered using a stateful DoFn, buffering/batching based upon a certain number of elements is shown in this blog[1] and could be extended for your usecase. 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble wrote: > My first streaming pipeline is pretty simple, it just pipes a > queue into files: > > - read JSON objects from PubsubIO > - event time = processing time > - 5 minute windows ( > - write n files to GCS, (TextIO.withNumShards() not dynamic) > > When the pipeline gets behind (for example, when the pipeline is > stopped for an hour and restarted) this creates problems because the > amount > of data per file becomes too much, and the pipeline stays behind. > > I believe that what is needed is a new step, just before "write to > GCS": > > - split/partition/window into ceil(totalElements / maxElements) > groups > > My next idea is to implement my own Partition and PartitionDoFn > that accept a PCollectionView from Count.perElemen(). > > Is there a more built-in way to accomplish dynamic partitions by > element quantity? > > Jacob > >>> >> > >>> >> >
Re: How to window by quantity of data?
Thomas, I reworked using the GroupIntoBatches PTransform, and things working great (with fewer lines of code). Thanks Jacob On Wed, Oct 18, 2017 at 1:12 PM, Jacob Marblewrote: > That gist isn't working right now, but I'll update it when I find the bug. > > The direct runner grows memory, but never writes files. > The dataflow runner writes temp files, but FinalizeGroupByKey never moves > them to the final destination. > > Jacob > > On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marble > wrote: > >> Consider multiple instances of a DoFn: >> >> @ProcessElement >> public void window(ProcessContext context, >> @StateId("count") ValueState countState) { >> >> int count = MoreObjects.firstNonNull(countState.read(), 0); >> count += 1; >> countState.write(count); >> >> If two instances read countState, then write countState, will countState >> not be incremented by 1, but not by 2? >> >> Jacob >> >> On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik wrote: >> >>> What do you mean by non-atomic? >>> >>> All output/state changes/timers from a process bundle are an all or >>> nothing change. So if processing a bundle fails, any state changes are >>> discarded and the state is reset to what it was before the bundle was >>> processed. >>> >>> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble >>> wrote: >>> Here's a gist: https://gist.github.com/jacobm arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c Should I consider StateId value mutations to be non-atomic? Jacob On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik wrote: > Feel free to share it with an online paste or a link to a github repo > containing the code. > > Other users may be interested in your solution. > > On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble > wrote: > >> Lukasz- >> >> That worked. I created a stateful DoFn with a stale timer, an initial >> timestamp state, and a counter state, along with a buffer of elements to >> bundle. When the counter or timer exceeds max values, >> outputWithTimestamp(). >> >> I'm happy to post the entire implementation somewhere, not sure about >> etiquette and how this mailing list handles attachments. >> >> Jacob >> >> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik >> wrote: >> >>> Have you considered using a stateful DoFn, buffering/batching based >>> upon a certain number of elements is shown in this blog[1] and could be >>> extended for your usecase. >>> >>> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >>> >>> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble >>> wrote: >>> My first streaming pipeline is pretty simple, it just pipes a queue into files: - read JSON objects from PubsubIO - event time = processing time - 5 minute windows ( - write n files to GCS, (TextIO.withNumShards() not dynamic) When the pipeline gets behind (for example, when the pipeline is stopped for an hour and restarted) this creates problems because the amount of data per file becomes too much, and the pipeline stays behind. I believe that what is needed is a new step, just before "write to GCS": - split/partition/window into ceil(totalElements / maxElements) groups My next idea is to implement my own Partition and PartitionDoFn that accept a PCollectionView from Count.perElemen(). Is there a more built-in way to accomplish dynamic partitions by element quantity? Jacob >>> >>> >> > >>> >> >
Re: How to window by quantity of data?
That gist isn't working right now, but I'll update it when I find the bug. The direct runner grows memory, but never writes files. The dataflow runner writes temp files, but FinalizeGroupByKey never moves them to the final destination. Jacob On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marblewrote: > Consider multiple instances of a DoFn: > > @ProcessElement > public void window(ProcessContext context, > @StateId("count") ValueState countState) { > > int count = MoreObjects.firstNonNull(countState.read(), 0); > count += 1; > countState.write(count); > > If two instances read countState, then write countState, will countState > not be incremented by 1, but not by 2? > > Jacob > > On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik wrote: > >> What do you mean by non-atomic? >> >> All output/state changes/timers from a process bundle are an all or >> nothing change. So if processing a bundle fails, any state changes are >> discarded and the state is reset to what it was before the bundle was >> processed. >> >> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble >> wrote: >> >>> Here's a gist: https://gist.github.com/jacobm >>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c >>> >>> Should I consider StateId value mutations to be non-atomic? >>> >>> Jacob >>> >>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik wrote: >>> Feel free to share it with an online paste or a link to a github repo containing the code. Other users may be interested in your solution. On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble wrote: > Lukasz- > > That worked. I created a stateful DoFn with a stale timer, an initial > timestamp state, and a counter state, along with a buffer of elements to > bundle. When the counter or timer exceeds max values, > outputWithTimestamp(). > > I'm happy to post the entire implementation somewhere, not sure about > etiquette and how this mailing list handles attachments. > > Jacob > > On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik wrote: > >> Have you considered using a stateful DoFn, buffering/batching based >> upon a certain number of elements is shown in this blog[1] and could be >> extended for your usecase. >> >> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >> >> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble >> wrote: >> >>> My first streaming pipeline is pretty simple, it just pipes a queue >>> into files: >>> >>> - read JSON objects from PubsubIO >>> - event time = processing time >>> - 5 minute windows ( >>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>> >>> When the pipeline gets behind (for example, when the pipeline is >>> stopped for an hour and restarted) this creates problems because the >>> amount >>> of data per file becomes too much, and the pipeline stays behind. >>> >>> I believe that what is needed is a new step, just before "write to >>> GCS": >>> >>> - split/partition/window into ceil(totalElements / maxElements) >>> groups >>> >>> My next idea is to implement my own Partition and PartitionDoFn that >>> accept a PCollectionView from Count.perElemen(). >>> >>> Is there a more built-in way to accomplish dynamic partitions by >>> element quantity? >>> >>> Jacob >>> >> >> > >>> >> >
Re: How to window by quantity of data?
The calls are essentially atomic per-key. More specifically, the two calls can occur in one of two ways: 1) They are for elements which share a key. If so, the calls _must_ be made serially, so the second read() will see the result of the first write() 2) They are for elements which do not share a key. If so, the state instances are unique and independent, so both will contain 1. As an aside, I unfortunately missed some of this - part of the change may be duplication of the 'GroupIntoBatches' PTransform within the Core SDK. On Wed, Oct 18, 2017 at 12:55 PM, Jacob Marblewrote: > Consider multiple instances of a DoFn: > > @ProcessElement > public void window(ProcessContext context, > @StateId("count") ValueState countState) { > > int count = MoreObjects.firstNonNull(countState.read(), 0); > count += 1; > countState.write(count); > > If two instances read countState, then write countState, will countState > not be incremented by 1, but not by 2? > > Jacob > > On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwik wrote: > >> What do you mean by non-atomic? >> >> All output/state changes/timers from a process bundle are an all or >> nothing change. So if processing a bundle fails, any state changes are >> discarded and the state is reset to what it was before the bundle was >> processed. >> >> On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble >> wrote: >> >>> Here's a gist: https://gist.github.com/jacobm >>> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c >>> >>> Should I consider StateId value mutations to be non-atomic? >>> >>> Jacob >>> >>> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik wrote: >>> Feel free to share it with an online paste or a link to a github repo containing the code. Other users may be interested in your solution. On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble wrote: > Lukasz- > > That worked. I created a stateful DoFn with a stale timer, an initial > timestamp state, and a counter state, along with a buffer of elements to > bundle. When the counter or timer exceeds max values, > outputWithTimestamp(). > > I'm happy to post the entire implementation somewhere, not sure about > etiquette and how this mailing list handles attachments. > > Jacob > > On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik wrote: > >> Have you considered using a stateful DoFn, buffering/batching based >> upon a certain number of elements is shown in this blog[1] and could be >> extended for your usecase. >> >> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >> >> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble >> wrote: >> >>> My first streaming pipeline is pretty simple, it just pipes a queue >>> into files: >>> >>> - read JSON objects from PubsubIO >>> - event time = processing time >>> - 5 minute windows ( >>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>> >>> When the pipeline gets behind (for example, when the pipeline is >>> stopped for an hour and restarted) this creates problems because the >>> amount >>> of data per file becomes too much, and the pipeline stays behind. >>> >>> I believe that what is needed is a new step, just before "write to >>> GCS": >>> >>> - split/partition/window into ceil(totalElements / maxElements) >>> groups >>> >>> My next idea is to implement my own Partition and PartitionDoFn that >>> accept a PCollectionView from Count.perElemen(). >>> >>> Is there a more built-in way to accomplish dynamic partitions by >>> element quantity? >>> >>> Jacob >>> >> >> > >>> >> >
Re: How to window by quantity of data?
Consider multiple instances of a DoFn: @ProcessElement public void window(ProcessContext context, @StateId("count") ValueState countState) { int count = MoreObjects.firstNonNull(countState.read(), 0); count += 1; countState.write(count); If two instances read countState, then write countState, will countState not be incremented by 1, but not by 2? Jacob On Wed, Oct 18, 2017 at 12:43 PM, Lukasz Cwikwrote: > What do you mean by non-atomic? > > All output/state changes/timers from a process bundle are an all or > nothing change. So if processing a bundle fails, any state changes are > discarded and the state is reset to what it was before the bundle was > processed. > > On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marble > wrote: > >> Here's a gist: https://gist.github.com/jacobm >> arble/6ca40e0a14828e6a0dfe89b9cb2e4b4c >> >> Should I consider StateId value mutations to be non-atomic? >> >> Jacob >> >> On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik wrote: >> >>> Feel free to share it with an online paste or a link to a github repo >>> containing the code. >>> >>> Other users may be interested in your solution. >>> >>> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble >>> wrote: >>> Lukasz- That worked. I created a stateful DoFn with a stale timer, an initial timestamp state, and a counter state, along with a buffer of elements to bundle. When the counter or timer exceeds max values, outputWithTimestamp(). I'm happy to post the entire implementation somewhere, not sure about etiquette and how this mailing list handles attachments. Jacob On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik wrote: > Have you considered using a stateful DoFn, buffering/batching based > upon a certain number of elements is shown in this blog[1] and could be > extended for your usecase. > > 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html > > On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble > wrote: > >> My first streaming pipeline is pretty simple, it just pipes a queue >> into files: >> >> - read JSON objects from PubsubIO >> - event time = processing time >> - 5 minute windows ( >> - write n files to GCS, (TextIO.withNumShards() not dynamic) >> >> When the pipeline gets behind (for example, when the pipeline is >> stopped for an hour and restarted) this creates problems because the >> amount >> of data per file becomes too much, and the pipeline stays behind. >> >> I believe that what is needed is a new step, just before "write to >> GCS": >> >> - split/partition/window into ceil(totalElements / maxElements) groups >> >> My next idea is to implement my own Partition and PartitionDoFn that >> accept a PCollectionView from Count.perElemen(). >> >> Is there a more built-in way to accomplish dynamic partitions by >> element quantity? >> >> Jacob >> > > >>> >> >
Re: How to window by quantity of data?
What do you mean by non-atomic? All output/state changes/timers from a process bundle are an all or nothing change. So if processing a bundle fails, any state changes are discarded and the state is reset to what it was before the bundle was processed. On Wed, Oct 18, 2017 at 12:15 PM, Jacob Marblewrote: > Here's a gist: https://gist.github.com/jacobmarble/ > 6ca40e0a14828e6a0dfe89b9cb2e4b4c > > Should I consider StateId value mutations to be non-atomic? > > Jacob > > On Wed, Oct 18, 2017 at 9:25 AM, Lukasz Cwik wrote: > >> Feel free to share it with an online paste or a link to a github repo >> containing the code. >> >> Other users may be interested in your solution. >> >> On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marble >> wrote: >> >>> Lukasz- >>> >>> That worked. I created a stateful DoFn with a stale timer, an initial >>> timestamp state, and a counter state, along with a buffer of elements to >>> bundle. When the counter or timer exceeds max values, outputWithTimestamp(). >>> >>> I'm happy to post the entire implementation somewhere, not sure about >>> etiquette and how this mailing list handles attachments. >>> >>> Jacob >>> >>> On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik wrote: >>> Have you considered using a stateful DoFn, buffering/batching based upon a certain number of elements is shown in this blog[1] and could be extended for your usecase. 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble wrote: > My first streaming pipeline is pretty simple, it just pipes a queue > into files: > > - read JSON objects from PubsubIO > - event time = processing time > - 5 minute windows ( > - write n files to GCS, (TextIO.withNumShards() not dynamic) > > When the pipeline gets behind (for example, when the pipeline is > stopped for an hour and restarted) this creates problems because the > amount > of data per file becomes too much, and the pipeline stays behind. > > I believe that what is needed is a new step, just before "write to > GCS": > > - split/partition/window into ceil(totalElements / maxElements) groups > > My next idea is to implement my own Partition and PartitionDoFn that > accept a PCollectionView from Count.perElemen(). > > Is there a more built-in way to accomplish dynamic partitions by > element quantity? > > Jacob > >>> >> >
Re: How to window by quantity of data?
Feel free to share it with an online paste or a link to a github repo containing the code. Other users may be interested in your solution. On Tue, Oct 17, 2017 at 9:05 PM, Jacob Marblewrote: > Lukasz- > > That worked. I created a stateful DoFn with a stale timer, an initial > timestamp state, and a counter state, along with a buffer of elements to > bundle. When the counter or timer exceeds max values, outputWithTimestamp(). > > I'm happy to post the entire implementation somewhere, not sure about > etiquette and how this mailing list handles attachments. > > Jacob > > On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwik wrote: > >> Have you considered using a stateful DoFn, buffering/batching based upon >> a certain number of elements is shown in this blog[1] and could be extended >> for your usecase. >> >> 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html >> >> On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble >> wrote: >> >>> My first streaming pipeline is pretty simple, it just pipes a queue into >>> files: >>> >>> - read JSON objects from PubsubIO >>> - event time = processing time >>> - 5 minute windows ( >>> - write n files to GCS, (TextIO.withNumShards() not dynamic) >>> >>> When the pipeline gets behind (for example, when the pipeline is stopped >>> for an hour and restarted) this creates problems because the amount of data >>> per file becomes too much, and the pipeline stays behind. >>> >>> I believe that what is needed is a new step, just before "write to GCS": >>> >>> - split/partition/window into ceil(totalElements / maxElements) groups >>> >>> My next idea is to implement my own Partition and PartitionDoFn that >>> accept a PCollectionView from Count.perElemen(). >>> >>> Is there a more built-in way to accomplish dynamic partitions by element >>> quantity? >>> >>> Jacob >>> >> >> >
Re: How to window by quantity of data?
Lukasz- That worked. I created a stateful DoFn with a stale timer, an initial timestamp state, and a counter state, along with a buffer of elements to bundle. When the counter or timer exceeds max values, outputWithTimestamp(). I'm happy to post the entire implementation somewhere, not sure about etiquette and how this mailing list handles attachments. Jacob On Tue, Oct 17, 2017 at 2:54 PM, Lukasz Cwikwrote: > Have you considered using a stateful DoFn, buffering/batching based upon a > certain number of elements is shown in this blog[1] and could be extended > for your usecase. > > 1: https://beam.apache.org/blog/2017/08/28/timely-processing.html > > On Tue, Oct 17, 2017 at 2:46 PM, Jacob Marble wrote: > >> My first streaming pipeline is pretty simple, it just pipes a queue into >> files: >> >> - read JSON objects from PubsubIO >> - event time = processing time >> - 5 minute windows ( >> - write n files to GCS, (TextIO.withNumShards() not dynamic) >> >> When the pipeline gets behind (for example, when the pipeline is stopped >> for an hour and restarted) this creates problems because the amount of data >> per file becomes too much, and the pipeline stays behind. >> >> I believe that what is needed is a new step, just before "write to GCS": >> >> - split/partition/window into ceil(totalElements / maxElements) groups >> >> My next idea is to implement my own Partition and PartitionDoFn that >> accept a PCollectionView from Count.perElemen(). >> >> Is there a more built-in way to accomplish dynamic partitions by element >> quantity? >> >> Jacob >> > >
How to window by quantity of data?
My first streaming pipeline is pretty simple, it just pipes a queue into files: - read JSON objects from PubsubIO - event time = processing time - 5 minute windows ( - write n files to GCS, (TextIO.withNumShards() not dynamic) When the pipeline gets behind (for example, when the pipeline is stopped for an hour and restarted) this creates problems because the amount of data per file becomes too much, and the pipeline stays behind. I believe that what is needed is a new step, just before "write to GCS": - split/partition/window into ceil(totalElements / maxElements) groups My next idea is to implement my own Partition and PartitionDoFn that accept a PCollectionView from Count.perElemen(). Is there a more built-in way to accomplish dynamic partitions by element quantity? Jacob