HeartSaVioR commented on PR #43425: URL: https://github.com/apache/spark/pull/43425#issuecomment-1770211883
> it seems like we only expose partition Id as a metadata/internal column and that too only if its queried ? We don't seem to expose other cols such as batchId/operatorId etc. What is the reason for doing this ? The main usage for metadata column is to get data/row dependent internal information. batch ID and operator ID will be all same for all rows in the resulting DataFrame. Do we really need them? batchId might be still reasonable, as the default value is less obvious and users may want to know the exact value. It may be worth to discuss whether we want to add batchId or not. The default value of operator ID is simply obvious and users querying this should know the value in prior. Once they know that, withColumn is their friend. > for some of the queries such as join/FMGWS, it seems that we have different formats for v1/v2 and the user needs to query it differently within the selectExpr. How does the user discover these fields ? Is it possible to keep the source schema homogenous here ? Users can discover the schema before you execute the query. DataFrame.printSchema(). Specifically for stream-stream join, we expect users to specify the option joinSide and do not care about format version. I understand we are directly exposing the state as it is, which may be less familiar / friendly to users and concerning in UX perspective. My worrying for homogenizing is that we are going to require doing the same for every operator & every state format. Current implementation intentionally avoids to couple with the implementation detail of the operator. stream-stream join is just a one of the unfortunate case. But we could file an issue and discuss whether it is the right direction or not. > for join queries, what schema do we expose when a store name is explicitly specified vs not. I guess the ability to query a specific store name (esp the ones like right-keyToNumValues) is only really for debugging purposes in this case ? For join queries with joinSide option being specified, all input columns should be exposed as value and equality joining columns should be exposed as key. If they omit the joinSide option, they need to specify the store name explicitly and then we expose the internal data structure as it is (although we do not expect users want to do that). > Also, for join queries, where do we add the internal metadata cols like partitionId - not sure I found that This is missing one. I'll leave a self-comment. Nice finding! > for the tests, not sure I saw a simulation for the expected use-cases. for eg - some tests where we keep the streaming query running for a few batches and assert for certain conditions/state values along the way. So you meant reading the state from checkpoint without stopping streaming query, right? The guaranteeness for this use case is weak - the state reader never blocks the streaming query to clean up the version which state reader may decide to read. It's feasible we add test where the streaming query never goes beyond the version which triggers cleanup for state reader to read, but I'm wondering what we want to guarantee here. I'd probably explicitly mention that it is not suitable for the case where the streaming query is running and processing batches. All tests actually do that, right? We run these queries for a few batches, and verify the state values. I'm not sure what you are referring to. > Also, maybe around corruption detection where we artificially corrupt some values and show how the state reader can detect those ? I was looking into the option which provides the key/value as binary, but wanted to punt out to the further improvement. Furthermore, I thought about this case more, and realized there are multiple cases; corruption happened but RocksDB encoder loaded into UnsafeRow anyway vs corruption happened which would break RocksDB encoder as well. For the first one we could probably still handle this on top of general state store API. For the second one, we will need to come up with modification of state store API, which brings the broken data separately with normal data. It seems to incur more work than we anticipated, hence not suitable for initial implementation. Probably the better approach is to have some similar option like corrupt record column in CSV/JSON data source. We could file an issue and look into it later. > For tests, should we also add some cases with additional startStream/stopStream clauses and verify that state read is working as expected even when batch recovery/restart cases are involved ? Not sure I understand this correctly. Mind elaborating which scenario you are looking for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
