> I've had many conversations with people who want a message to not be ack'ed until all downstream consequences are committed to external user-specified storage. But I don't know how to do that.
I'm essentially talking about a special case of the above. A non-stateful DoFn at the end of a pipeline that commits results to external storage as part of finalizing each bundle. This assumes there's no windowing involved and that the DoFn buffers messages to some internal state that isn't exposed to Beam. This scenario is well outside Beam's intented use, there may be fundamental reasons why tracking internal to a DoFn will fail, and there are likely scenarios where a bundle succeeds multiple times and produces duplicate external output. But, from what I understand in this conversation so far, it may actually achieve at-least-once delivery in the face of a job being cancelled without draining. This is veering towards code golf and I think the small and uncontrollable size of bundles kills the idea anyway. But I really appreciate the discussion on potential approaches and of how internals of the Beam model affect what's possible here. On Fri, Apr 5, 2019 at 11:50 AM Kenneth Knowles <[email protected]> wrote: > Clarification: using a stateful ParDo operation may require *data to be > shuffled* which is just one of multiple steps in a full GBK operation. The > grouping by windows that is typically done after the data is shuffled can > also be expensive, and you will not incur that cost. > > Your analysis of when things get ack'd is correct. Stateful ParDo will > still ack the messages before it runs. > > The fundamental problem is that you simply have to trust your data > processing engine's checkpointing and drain, be it Dataflow shuffling or > Flink finalizing a checkpoint, etc. I can see how you might want to remove > this dependency on the internal storage mechanisms of a data processing > engine. I've had many conversations with people who want a message to not > be ack'ed until all downstream consequences are committed to external > user-specified storage. But I don't know how to do that. > > Kenn > > On Fri, Apr 5, 2019 at 6:39 AM Robert Bradshaw <[email protected]> > wrote: > >> True, though there's also a grouping inserted by the pubsub read >> operation itself to remove duplicates (PubSub is at least once >> delivery). Drain should work for a pipeline like this without any >> issues. >> >> On Fri, Apr 5, 2019 at 3:33 PM Jeff Klukas <[email protected]> wrote: >> > >> > Thanks for the suggestions. Ideally, I'd like to avoid GBK completely, >> which if I understand semantics correctly would mean messages don't get >> ack'd to PubSub until my @FinalizeBundle method has been called and the >> messages are persisted in GCS. If I use GroupIntoBatches, messages will get >> ack'd at the GBK and checkpointed, which means if we aren't able to >> properly drain the job, we lose that checkpointed state and drop messages. >> > >> > But GroupIntoBatches could potentially allow us to at least achieve >> fewer GBK operations. >> > >> > On Fri, Apr 5, 2019 at 3:48 AM Robert Bradshaw <[email protected]> >> wrote: >> >> >> >> Yes, you can leverage GroupIntoBatches to do this. This may still >> >> require a GBK as all state is keyed, and cardinality of your keyset >> >> would need to be chosen carefully to find a balance between getting >> >> sufficient batching and getting sufficient parallelization. If you >> >> write a DoFn with side effects (such as writing to GCS) you also need >> >> to be careful about handling idempotence (e.g. if in case bundle is >> >> re-tried after failure). >> >> >> >> On Thu, Apr 4, 2019 at 10:17 PM Ismaël Mejía <[email protected]> >> wrote: >> >> > >> >> > It seems you can 'hack' it with the State API. See the discussion on >> >> > this ticket: >> >> > https://issues.apache.org/jira/browse/BEAM-6886 >> >> > >> >> > On Thu, Apr 4, 2019 at 9:42 PM Jeff Klukas <[email protected]> >> wrote: >> >> > > >> >> > > As far as I can tell, Beam expects runners to have full control >> over separation of individual elements into bundles and this is something >> users have no control over. Is that true? Or are there any ways that I >> might exert some influence over bundle sizes? >> >> > > >> >> > > My main interest at the moment is investigating lighter-weight >> alternatives to FileIO for a simple but high-throughput Dataflow job that >> batches up messages from Pub/Sub and sinks them to GCS. I'm imagining a >> ParDo that buffers incoming messages and then writes them all as an object >> to GCS in a @FinalizeBundle method, avoiding the multiple GroupByKey >> operations needed for writing sharded output from FileIO. >> >> > > >> >> > > The problem is that bundles in practice look to be far too small >> to make this feasible. I deployed a 4 node test job that simply reads ~30k >> messages per second from Pub/Sub and passes them to a transform that >> publishes some metrics about the bundles passing through. I found a mean >> bundle size of ~500 elements corresponding to ~10 MB of data, which is too >> small for the proposed approach to be feasible. Are there any tricks I >> could use to coerce Dataflow to increase the size of bundles? >> >> > > >> >> > > I realize this is basically an abuse of the Beam programming >> model, but the alternative I'm looking at is having to write a custom >> application using the google-cloud APIs and deploying it on Kubernetes. >> >
