Hi Luke, Thanks a lot for your answers, very useful for us in order to better understand how we should proceed further with the pipeline implementation.
I still have a few other questions and maybe you can help with some answers: 1. What is Windmill? The term is present in job logs quite a lot but never understood what it is. Is this the service that runs on VMs and it's responsible for state synchronization between the VMs and state storage? 2. State size and the impact on pricing on GCP. The Dataflow pricing details don't mention anything regarding this. Do you know more details? 3. Is there a way to have some metrics about the size of the state? Regards, Tudor On Mon, Nov 22, 2021 at 7:27 PM Luke Cwik <[email protected]> wrote: > > > On Mon, Nov 22, 2021 at 8:28 AM Tudor Plugaru <[email protected]> wrote: > >> Hi, >> at the current company, we have multiple Beam pipelines developed using >> Python SDK and we are using Dataflow as a runner. In one specific pipeline, >> we are using the state extensively to store information about the last >> event for a specific key. Since without data, we might have quite a lot of >> different keys, we were wondering if there are some limitations regarding >> the amount of data that we can keep in the state. The pipelines are running >> in streaming mode and are using the Streaming Engine. I also did some >> research about this area before writing this email, but wasn't able to get >> a better understanding of how the state is implemented under the hood and >> what kind of guarantees it provides. I have found this article >> <https://flink.apache.org/2021/01/18/rocksdb.html> that explains how the >> state is being stored for Flink runner and since Beam can run on Flink, I >> assume that the approach might be the same in Dataflow but no docs are >> describing this. We do have a garbage collection logic implemented inside >> the pipeline that will clear up the keys which did not have activity for >> some time and it runs every 10 min, but we would like to extend this to >> maybe 24h but not sure if it's possible because of the unknowns regarding >> state storage. >> >> Can you please provide some answers to the questions that I have >> regarding how the state is stored? >> > > State is partitioned per key and window so it can scale to petabytes of > data in aggregate (reach out to GCP engineering for really large pipelines > to ensure that your solution is efficient though). You can start hitting > limits once you start storing more then 100's of MiBs per key and window > though since the way that Dataflow persists the data inside bigtable > atomically can hit row/column limits. > > >> 1. I assume it's key-value storage, but what kind of service is used? Are >> we able to have access to it? >> > > It is bigtable. You can't access it as of today. In a future version of > pipeline snapshots you might be able to mutate the state when the pipeline > isn't running. > > >> 2. What kind of guarantees does it provide? >> > State is either durably persisted with all the output produced within a > bundle or discarded and the bundle is reprocessed. See more details in > https://s.apache.org/beam-fn-api-processing-a-bundle and > https://s.apache.org/beam-fn-state-api-and-bundle-processing > > >> How the state transitioned to a new, updated pipeline? >> > The transform names are used to map existing state onto the new pipeline. > > >> Can we reuse the state when we deploy a new version of the pipeline >> without updating? >> > https://cloud.google.com/dataflow/docs/guides/using-snapshots is the > closest that I could think of. > > >> 3. Are there any resources where I can dig more into the implementation >> of the state? >> > Beyond public documentation I linked above about state, I don't think > there is a whitepaper about the internal details of Dataflow's Streaming > Engine. > > >> >> Regards, >> Tudor >> >
