You might also be interested in this: https://issues.apache.org/jira/browse/SPARK-19031
On Tue, Jan 3, 2017 at 3:36 PM, Michael Armbrust <mich...@databricks.com> wrote: > I think we should add something similar to mapWithState in 2.2. It would > be great if you could add the description of your problem to this ticket: > https://issues.apache.org/jira/browse/SPARK-19067 > > On Mon, Jan 2, 2017 at 2:05 PM, Jeremy Smith <jeremy.sm...@acorns.com> > wrote: > >> I have a question about state tracking in Structured Streaming. >> >> First let me briefly explain my use case: Given a mutable data source >> (i.e. an RDBMS) in which we assume we can retrieve a set of newly created >> row versions (being a row that was created or updated between two given >> `Offset`s, whatever those are), we can create a Structured Streaming >> `Source` which retrieves the new row versions. Further assuming that every >> logical row has some primary key, then as long as we can track the current >> offset for each primary key, we can differentiate between new and updated >> rows. Then, when a row is updated, we can record that the previous version >> of that row expired at some particular time. That's essentially what I'm >> trying to do. This would effectively give you an "event-sourcing" type of >> historical/immutable log of changes out of a mutable data source. >> >> I noticed that in Spark 2.0.1 there was a concept of a StateStore, which >> seemed like it would allow me to do exactly the tracking that I needed, so >> I decided to try and use that built-in functionality rather than some >> external key/value store for storing the current "version number" of each >> primary key. There were a lot of hard-coded hoops I had to jump through, >> but I eventually made it work by implementing some custom LogicalPlans and >> SparkPlans around StateStore[Save/Restore]Exec. >> >> Now, in Spark 2.1.0 it seems to have gotten even further away from what I >> was using it for - the keyExpressions of StateStoreSaveExec must include a >> timestamp column, which means that those expressions are not really keys >> (at least not for a logical row). So it appears I can't use it that way >> anymore (I can't blame Spark for this, as I knew what I was getting into >> when leveraging developer APIs). There are also several hard-coded checks >> which now make it clear that StateStore functionality is only to be used >> for streaming aggregates, which is not really what I'm doing. >> >> My question is - is there a good way to accomplish the above use case >> within Structured Streaming? Or is this the wrong use case for the state >> tracking functionality (which increasingly seems to be targeted toward >> aggregates only)? Is there a plan for any kind of generalized >> `mapWithState`-type functionality for Structured Streaming, or should I >> just give up on that and use an external key/value store for my state >> tracking? >> >> Thanks, >> Jeremy >> > >