Thanks for the suggestions. Ideally, I'd like to avoid GBK completely,
which if I understand semantics correctly would mean messages don't get
ack'd to PubSub until my @FinalizeBundle method has been called and the
messages are persisted in GCS. If I use GroupIntoBatches, messages will get
ack'd at the GBK and checkpointed, which means if we aren't able to
properly drain the job, we lose that checkpointed state and drop messages.

But GroupIntoBatches could potentially allow us to at least achieve fewer
GBK operations.

On Fri, Apr 5, 2019 at 3:48 AM Robert Bradshaw <[email protected]> wrote:

> Yes, you can leverage GroupIntoBatches to do this. This may still
> require a GBK as all state is keyed, and cardinality of your keyset
> would need to be chosen carefully to find a balance between getting
> sufficient batching and getting sufficient parallelization. If you
> write a DoFn with side effects (such as writing to GCS) you also need
> to be careful about handling idempotence (e.g. if in case bundle is
> re-tried after failure).
>
> On Thu, Apr 4, 2019 at 10:17 PM Ismaël Mejía <[email protected]> wrote:
> >
> > It seems you can 'hack' it with the State API. See the discussion on
> > this ticket:
> > https://issues.apache.org/jira/browse/BEAM-6886
> >
> > On Thu, Apr 4, 2019 at 9:42 PM Jeff Klukas <[email protected]> wrote:
> > >
> > > As far as I can tell, Beam expects runners to have full control over
> separation of individual elements into bundles and this is something users
> have no control over. Is that true? Or are there any ways that I might
> exert some influence over bundle sizes?
> > >
> > > My main interest at the moment is investigating lighter-weight
> alternatives to FileIO for a simple but high-throughput Dataflow job that
> batches up messages from Pub/Sub and sinks them to GCS. I'm imagining a
> ParDo that buffers incoming messages and then writes them all as an object
> to GCS in a @FinalizeBundle method, avoiding the multiple GroupByKey
> operations needed for writing sharded output from FileIO.
> > >
> > > The problem is that bundles in practice look to be far too small to
> make this feasible. I deployed a 4 node test job that simply reads ~30k
> messages per second from Pub/Sub and passes them to a transform that
> publishes some metrics about the bundles passing through. I found a mean
> bundle size of ~500 elements corresponding to ~10 MB of data, which is too
> small for the proposed approach to be feasible. Are there any tricks I
> could use to coerce Dataflow to increase the size of bundles?
> > >
> > > I realize this is basically an abuse of the Beam programming model,
> but the alternative I'm looking at is having to write a custom application
> using the google-cloud APIs and deploying it on Kubernetes.
>

Reply via email to