HeartSaVioR commented on PR #43425:
URL: https://github.com/apache/spark/pull/43425#issuecomment-1770211883

   > it seems like we only expose partition Id as a metadata/internal column 
and that too only if its queried ? We don't seem to expose other cols such as 
batchId/operatorId etc. What is the reason for doing this ?
   
   The main usage for metadata column is to get data/row dependent internal 
information. batch ID and operator ID will be all same for all rows in the 
resulting DataFrame. Do we really need them?
   
   batchId might be still reasonable, as the default value is less obvious and 
users may want to know the exact value. It may be worth to discuss whether we 
want to add batchId or not. The default value of operator ID is simply obvious 
and users querying this should know the value in prior. Once they know that, 
withColumn is their friend.
   
   > for some of the queries such as join/FMGWS, it seems that we have 
different formats for v1/v2 and the user needs to query it differently within 
the selectExpr. How does the user discover these fields ? Is it possible to 
keep the source schema homogenous here ?
   
   Users can discover the schema before you execute the query. 
DataFrame.printSchema(). Specifically for stream-stream join, we expect users 
to specify the option joinSide and do not care about format version.
   
   I understand we are directly exposing the state as it is, which may be less 
familiar / friendly to users and concerning in UX perspective. My worrying for 
homogenizing is that we are going to require doing the same for every operator 
& every state format. Current implementation intentionally avoids to couple 
with the implementation detail of the operator. stream-stream join is just a 
one of the unfortunate case.
   
   But we could file an issue and discuss whether it is the right direction or 
not.
   
   > for join queries, what schema do we expose when a store name is explicitly 
specified vs not. I guess the ability to query a specific store name (esp the 
ones like right-keyToNumValues) is only really for debugging purposes in this 
case ? 
   
   For join queries with joinSide option being specified, all input columns 
should be exposed as value and equality joining columns should be exposed as 
key. If they omit the joinSide option, they need to specify the store name 
explicitly and then we expose the internal data structure as it is (although we 
do not expect users want to do that).
   
   > Also, for join queries, where do we add the internal metadata cols like 
partitionId - not sure I found that
   
   This is missing one. I'll leave a self-comment. Nice finding!
   
   > for the tests, not sure I saw a simulation for the expected use-cases. for 
eg - some tests where we keep the streaming query running for a few batches and 
assert for certain conditions/state values along the way. 
   
   So you meant reading the state from checkpoint without stopping streaming 
query, right?
   
   The guaranteeness for this use case is weak - the state reader never blocks 
the streaming query to clean up the version which state reader may decide to 
read. It's feasible we add test where the streaming query never goes beyond the 
version which triggers cleanup for state reader to read, but I'm wondering what 
we want to guarantee here. I'd probably explicitly mention that it is not 
suitable for the case where the streaming query is running and processing 
batches.
   
   All tests actually do that, right? We run these queries for a few batches, 
and verify the state values. I'm not sure what you are referring to.
   
   > Also, maybe around corruption detection where we artificially corrupt some 
values and show how the state reader can detect those ?
   
   I was looking into the option which provides the key/value as binary, but 
wanted to punt out to the further improvement. 
   
   Furthermore, I thought about this case more, and realized there are multiple 
cases; corruption happened but RocksDB encoder loaded into UnsafeRow anyway vs 
corruption happened which would break RocksDB encoder as well. For the first 
one we could probably still handle this on top of general state store API. For 
the second one, we will need to come up with modification of state store API, 
which brings the broken data separately with normal data. It seems to incur 
more work than we anticipated, hence not suitable for initial implementation.
   
   Probably the better approach is to have some similar option like corrupt 
record column in CSV/JSON data source. We could file an issue and look into it 
later.
   
   > For tests, should we also add some cases with additional 
startStream/stopStream clauses and verify that state read is working as 
expected even when batch recovery/restart cases are involved ?
   
   Not sure I understand this correctly. Mind elaborating which scenario you 
are looking for?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to