Yes, if you don’t need to have a strong delivery semantics, then you just can 
instantiate a writer in DoFn @setup and use it for every bundle in your write 
DoFn. If the writer supports a pool of connections, then @setup will a good 
place for that too - then get a new connection for every bundle in @startBundle.

Take a look on CassadraIO.Write [1] or JdbcIO.Write [2] implementations. It’s 
based on DoFn and could be helpful as an example.

[1] 
https://github.com/apache/beam/blob/61b665640d6c0f91751bba59782c0ac6aceacba6/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1137
[2] 
https://github.com/apache/beam/blob/61b665640d6c0f91751bba59782c0ac6aceacba6/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1332

> On 18 Jun 2020, at 10:49, Kaymak, Tobias <[email protected]> wrote:
> 
> So I came up with this - everything is handled within the same pipeline and 
> after the CoGroupByKey for the articles (articleId) and assets (articleId) 
> there is a ParDo doing RPC (not batched) towards Cassandra to first write out 
> new asset information for an article, then fetch any historical per articleId 
> from Cassandra, and then write the merged result out via BigQueryIO.
> 
> My open question is - if I want to do RPC calls in the ParDo, I should handle 
> my own Cassandra connection in it, one per ParDo, right?
> 
> [Batching the RPCs is right now not a concern, as I expect Cassandra to scale 
> for these reads and writes quite well.] 
> 
> <image.png>
> 
> On Wed, Jun 17, 2020 at 6:28 PM Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
> Your right, 2 TiB is a lot for some runners. Not all runners need to use 
> memory, some may decide to use a backing disk to store state that isn't being 
> used so it comes down to only state which is part of the working set.
> 
> Your pipeline using a GBK followed by an enrichment ParDo makes sense.
> 
> On Wed, Jun 17, 2020 at 9:20 AM Kaymak, Tobias <[email protected] 
> <mailto:[email protected]>> wrote:
> What about having just one pipeline and a pre-loaded Cassandra with the 
> history? 
> 
> Like: The pipeline reads from Kafka, then groups by key (for assets and 
> articles that would be articleId) and then inserting the new assets to 
> Cassandra inside a ParDo that also does a lookup for history for a given key 
> (the enrichment) at the same time? Does that make sense?
> 
> I fear that the stateful DoFn would need a lot of memory as well - so it 
> would need to distribute the 2TiB among the workers (?)
> 
> On Wed, Jun 17, 2020 at 6:01 PM Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
> You would need to ingest all the data into the pipeline (the 2 TiB) to be 
> able to have the pipeline manage the state. Side inputs would only work if 
> you could find a runner that allows for streaming side inputs that are that 
> large (none that I'm aware of). 
> 
> For a stateful DoFn, you would "flatten" all the data from cassandra with the 
> Kafka data and feed this into a stateful DoFn. The cassandra cluster would be 
> a bounded source so the watermark would prevent reading from Kafka till all 
> the Cassandra data has been read into the pipeline. The stateful DoFn key 
> would be whatever is the join criteria for the two streams (like a userId).
> 
> 
> 
> On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <[email protected] 
> <mailto:[email protected]>> wrote:
> Thank you Luke.
> 
> I do realize that the lag is a problem, but I fear I am not fully grasping 
> what you mean with "relying on the pipeline managing the state". 
> What is missing from the picture is that the Cassandra cluster will be 
> pre-loaded with the complete history of data which is around 2TiB 
> uncompressed, afterwards then it will get constantly updated through Kafka.
> 
> If the two input streams only hold data for 14 days how would a CoGBK get the 
> whole picture - or how could I rely on a side input or user state?
> 
> On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <[email protected] 
> <mailto:[email protected]>> wrote:
> One issue that you are going to run into is that you can't enforce that the 
> history is up to date as the two pipelines are running with their own lag. 
> Even within the same pipeline this is very hard to do correctly and a lot of 
> times it is much easier to rely on the pipeline managing the state.
> 
> Other than the CoGBK, you could also use user state or a side input to do the 
> join.
> 
> 
> 
> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <[email protected] 
> <mailto:[email protected]>> wrote:
> Hello,
> 
> I've attached my poor drawing to this email, that illustrates my idea:
> 
> I am dealing with two unlimited very large streams of data that should be 
> joined (articles and assets). 
> 
> My first step is to have a pipeline #1 that ingests all assets and streams 
> them into a fast key-lookup store. 
> My second step would be to have a pipeline #2 that uses a CoGroupByKey to 
> join those two streams, then doing a batched RPC against that fast key-lookup 
> store to fetch any historical data.
> 
> This way I am making sure that I have the latest data always available (even 
> when pipeline #1 is broken or late for a couple of minutes), and by doing the 
> batched RPC to have always the complete history at hand (and I don't need to 
> keep a huge state in Beam on the workers).
> 
> Does this pattern make sense? 
> <image.png>
> Cheers,
> Tobi

Reply via email to