Yes, if you don’t need to have a strong delivery semantics, then you just can instantiate a writer in DoFn @setup and use it for every bundle in your write DoFn. If the writer supports a pool of connections, then @setup will a good place for that too - then get a new connection for every bundle in @startBundle.
Take a look on CassadraIO.Write [1] or JdbcIO.Write [2] implementations. It’s based on DoFn and could be helpful as an example. [1] https://github.com/apache/beam/blob/61b665640d6c0f91751bba59782c0ac6aceacba6/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1137 [2] https://github.com/apache/beam/blob/61b665640d6c0f91751bba59782c0ac6aceacba6/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1332 > On 18 Jun 2020, at 10:49, Kaymak, Tobias <[email protected]> wrote: > > So I came up with this - everything is handled within the same pipeline and > after the CoGroupByKey for the articles (articleId) and assets (articleId) > there is a ParDo doing RPC (not batched) towards Cassandra to first write out > new asset information for an article, then fetch any historical per articleId > from Cassandra, and then write the merged result out via BigQueryIO. > > My open question is - if I want to do RPC calls in the ParDo, I should handle > my own Cassandra connection in it, one per ParDo, right? > > [Batching the RPCs is right now not a concern, as I expect Cassandra to scale > for these reads and writes quite well.] > > <image.png> > > On Wed, Jun 17, 2020 at 6:28 PM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > Your right, 2 TiB is a lot for some runners. Not all runners need to use > memory, some may decide to use a backing disk to store state that isn't being > used so it comes down to only state which is part of the working set. > > Your pipeline using a GBK followed by an enrichment ParDo makes sense. > > On Wed, Jun 17, 2020 at 9:20 AM Kaymak, Tobias <[email protected] > <mailto:[email protected]>> wrote: > What about having just one pipeline and a pre-loaded Cassandra with the > history? > > Like: The pipeline reads from Kafka, then groups by key (for assets and > articles that would be articleId) and then inserting the new assets to > Cassandra inside a ParDo that also does a lookup for history for a given key > (the enrichment) at the same time? Does that make sense? > > I fear that the stateful DoFn would need a lot of memory as well - so it > would need to distribute the 2TiB among the workers (?) > > On Wed, Jun 17, 2020 at 6:01 PM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > You would need to ingest all the data into the pipeline (the 2 TiB) to be > able to have the pipeline manage the state. Side inputs would only work if > you could find a runner that allows for streaming side inputs that are that > large (none that I'm aware of). > > For a stateful DoFn, you would "flatten" all the data from cassandra with the > Kafka data and feed this into a stateful DoFn. The cassandra cluster would be > a bounded source so the watermark would prevent reading from Kafka till all > the Cassandra data has been read into the pipeline. The stateful DoFn key > would be whatever is the join criteria for the two streams (like a userId). > > > > On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <[email protected] > <mailto:[email protected]>> wrote: > Thank you Luke. > > I do realize that the lag is a problem, but I fear I am not fully grasping > what you mean with "relying on the pipeline managing the state". > What is missing from the picture is that the Cassandra cluster will be > pre-loaded with the complete history of data which is around 2TiB > uncompressed, afterwards then it will get constantly updated through Kafka. > > If the two input streams only hold data for 14 days how would a CoGBK get the > whole picture - or how could I rely on a side input or user state? > > On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > One issue that you are going to run into is that you can't enforce that the > history is up to date as the two pipelines are running with their own lag. > Even within the same pipeline this is very hard to do correctly and a lot of > times it is much easier to rely on the pipeline managing the state. > > Other than the CoGBK, you could also use user state or a side input to do the > join. > > > > On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <[email protected] > <mailto:[email protected]>> wrote: > Hello, > > I've attached my poor drawing to this email, that illustrates my idea: > > I am dealing with two unlimited very large streams of data that should be > joined (articles and assets). > > My first step is to have a pipeline #1 that ingests all assets and streams > them into a fast key-lookup store. > My second step would be to have a pipeline #2 that uses a CoGroupByKey to > join those two streams, then doing a batched RPC against that fast key-lookup > store to fetch any historical data. > > This way I am making sure that I have the latest data always available (even > when pipeline #1 is broken or late for a couple of minutes), and by doing the > batched RPC to have always the complete history at hand (and I don't need to > keep a huge state in Beam on the workers). > > Does this pattern make sense? > <image.png> > Cheers, > Tobi
