anishshri-db commented on code in PR #50177: URL: https://github.com/apache/spark/pull/50177#discussion_r2049810090
########## docs/streaming/structured-streaming-transform-with-state.md: ########## @@ -0,0 +1,324 @@ +--- +layout: global +displayTitle: Structured Streaming Programming Guide +title: Structured Streaming Programming Guide +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +# Overview + +TransformWithState is the new arbitrary stateful operator in Structured Streaming since the Apache Spark 4.0 release. This operator is the next generation replacement for the old mapGroupsWithState/flatMapGroupsWithState API for arbitrary stateful processing in Apache Spark. + +This operator has support for an umbrella of features such as object-oriented stateful processor definition, composite types, automatic TTL based eviction, timers etc and can be used to build business-critical operational use-cases. + +# Language Support + +`TransformWithState` is available in Scala, Java and Python. Note that in Python, the operator name is called `transformWithStateInPandas` similar to other operators interacting with the Pandas interface in Apache Spark. + +# Components of a TransformWithState Query + +A transformWithState query typically consists of the following components: +- Stateful Processor - A user-defined stateful processor that defines the stateful logic +- Output Mode - Output mode for the query such as Append, Update etc +- Time Mode - Time mode for the query such as EventTime, ProcessingTime etc +- Initial State - An optional initial state batch dataframe used to pre-populate the state + +In the following sections, we will go through the above components in more detail. + +## Defining a Stateful Processor + +A stateful processor is the core of the user-defined logic used to operate on the input events. A stateful processor is defined by extending the StatefulProcessor class and implementing a few methods. + +A typical stateful processor deals with the following constructs: +- Input Records - Input records received by the stream +- State Variables - Zero or more class specific members used to store user state +- Output Records - Output records produced by the processor. Zero or more output records may be produced by the processor. + +A stateful processor uses the object-oriented paradigm to define the stateful logic. The stateful logic is defined by implementing the following methods: + - `init` - Initialize the stateful processor and define any state variables as needed + - `handleInputRows` - Process input rows belonging to a grouping key and emit output if needed + - `handleExpiredTimer` - Handle expired timers and emit output if needed + - `close` - Perform any cleanup operations if needed + - `handleInitialState` - Optionally handle the initial state batch dataframe + +The methods above will be invoked by the Spark query engine when the operator is executed as part of a streaming query. + +Note also that not all types of operations are supported in each of the methods. For eg, users cannot register timers in the `init` method. Similarly, they cannot operate on input rows in the `handleExpiredTimer` method. The engine will detect unsupported/incompatible operations and fail the query, if needed. + +### Using the StatefulProcessorHandle + +Many operations within the methods above can be performed using the `StatefulProcessorHandle` object. The `StatefulProcessorHandle` object provides methods to interact with the underlying state store. This object can be retrieved within the StatefulProcessor by invoking the `getHandle` method. + +### Using State Variables + +State variables are class specific members used to store user state. They need to be declared once and initialized within the `init` method of the stateful processor. + +Initializing a state variable typically involves the following steps: +- Provide a unique name for the state variable (unique within the stateful processor definition) +- Provide a type for the state variable (ValueState, ListState, MapState) - depending on the type, the appropriate method on the handle needs to be invoked +- Provide a state encoder for the state variable (in Scala - this can be skipped if implicit encoders are available) +- Provide an optional TTL config for the state variable + +### Types of state variables + +State variables can be of the following types: +- Value State +- List State +- Map State + +Similar to collections for popular programming languages, the state types could be used to model data structures optimized for various types of operations for the underlying storage layer. For example, appends are optimized for ListState and point lookups are optimized for MapState. + +### Providing state encoders + +State encoders are used to serialize and deserialize the state variables. In Scala, the state encoders can be skipped if implicit encoders are available. In Java and Python, the state encoders need to be provided explicitly. +Built-in encoders for primitives, case classes and Java Bean classes are provided by default via the Spark SQL encoders. + +#### Providing implicit encoders in Scala + +In Scala, implicit encoders can be provided for case classes and primitive types. The `implicits` object is provided as part of the `StatefulProcessor` class. Within the StatefulProcessor definition, the user can simply import implicits as `import implicits._` and then they do not require to pass the encoder type explicitly. + +### Providing TTL for state variables + +State variables can be configured with an optional TTL (Time-To-Live) value. The TTL value is used to automatically evict the state variable after the specified duration. The TTL value can be provided as a Duration. + +### Handling input rows + +The `handleInputRows` method is used to process input rows belonging to a grouping key and emit output if needed. The method is invoked by the Spark query engine for each grouping key value received by the operator. If multiple rows belong to the same grouping key, the provided iterator will include all those rows. + +### Handling expired timers + +Within the `handleInputRows` or `handleExpiredTimer` methods, the stateful processor can register timers to be triggered at a later time. The `handleExpiredTimer` method is invoked by the Spark query engine when a timer set by the stateful processor has expired. This method is invoked once for each expired timer. +Here are a few timer properties that are supported: +- Multiple timers associated with the same grouping key can be registered +- The engine provides the ability to list/add/remove timers as needed +- Timers are also checkpointed as part of the query checkpoint and can be triggered on query restart as well. + +### Handling initial state + +The `handleInitialState` method is used to optionally handle the initial state batch dataframe. The initial state batch dataframe is used to pre-populate the state for the stateful processor. The method is invoked by the Spark query engine when the initial state batch dataframe is available. +This method is only called once in the lifetime of the query. This is invoked before any input rows are processed by the stateful processor. + +### Putting it all together + +Here is an example of a StatefulProcessor that implements a downtime detector. Each time a new value is seen for a given key, it updates the lastSeen state value, clears any existing timers, and resets a timer for the future. + +When a timer expires, the application emits the elapsed time since the last observed event for the key. It then sets a new timer to emit an update 10 seconds later. + +<div class="codetabs"> + +<div data-lang="python" markdown="1"> + +{% highlight python %} + +class DownTimeDetector(StatefulProcessor): + def init(self, handle: StatefulProcessorHandle) -> None: + # Define schema for the state value (timestamp) + state_schema = StructType([StructField("value", TimestampType(), True)]) + self.handle = handle + # Initialize state to store the last seen timestamp for each key + self.last_seen = handle.getValueState("last_seen", state_schema) + + def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]: + latest_from_existing = self.last_seen.get() Review Comment: Done ########## docs/streaming/structured-streaming-transform-with-state.md: ########## @@ -0,0 +1,324 @@ +--- +layout: global +displayTitle: Structured Streaming Programming Guide +title: Structured Streaming Programming Guide +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +# Overview + +TransformWithState is the new arbitrary stateful operator in Structured Streaming since the Apache Spark 4.0 release. This operator is the next generation replacement for the old mapGroupsWithState/flatMapGroupsWithState API for arbitrary stateful processing in Apache Spark. + +This operator has support for an umbrella of features such as object-oriented stateful processor definition, composite types, automatic TTL based eviction, timers etc and can be used to build business-critical operational use-cases. + +# Language Support + +`TransformWithState` is available in Scala, Java and Python. Note that in Python, the operator name is called `transformWithStateInPandas` similar to other operators interacting with the Pandas interface in Apache Spark. + +# Components of a TransformWithState Query + +A transformWithState query typically consists of the following components: +- Stateful Processor - A user-defined stateful processor that defines the stateful logic +- Output Mode - Output mode for the query such as Append, Update etc +- Time Mode - Time mode for the query such as EventTime, ProcessingTime etc +- Initial State - An optional initial state batch dataframe used to pre-populate the state + +In the following sections, we will go through the above components in more detail. + +## Defining a Stateful Processor + +A stateful processor is the core of the user-defined logic used to operate on the input events. A stateful processor is defined by extending the StatefulProcessor class and implementing a few methods. + +A typical stateful processor deals with the following constructs: +- Input Records - Input records received by the stream +- State Variables - Zero or more class specific members used to store user state +- Output Records - Output records produced by the processor. Zero or more output records may be produced by the processor. + +A stateful processor uses the object-oriented paradigm to define the stateful logic. The stateful logic is defined by implementing the following methods: + - `init` - Initialize the stateful processor and define any state variables as needed + - `handleInputRows` - Process input rows belonging to a grouping key and emit output if needed + - `handleExpiredTimer` - Handle expired timers and emit output if needed + - `close` - Perform any cleanup operations if needed + - `handleInitialState` - Optionally handle the initial state batch dataframe + +The methods above will be invoked by the Spark query engine when the operator is executed as part of a streaming query. + +Note also that not all types of operations are supported in each of the methods. For eg, users cannot register timers in the `init` method. Similarly, they cannot operate on input rows in the `handleExpiredTimer` method. The engine will detect unsupported/incompatible operations and fail the query, if needed. + +### Using the StatefulProcessorHandle + +Many operations within the methods above can be performed using the `StatefulProcessorHandle` object. The `StatefulProcessorHandle` object provides methods to interact with the underlying state store. This object can be retrieved within the StatefulProcessor by invoking the `getHandle` method. + +### Using State Variables + +State variables are class specific members used to store user state. They need to be declared once and initialized within the `init` method of the stateful processor. + +Initializing a state variable typically involves the following steps: +- Provide a unique name for the state variable (unique within the stateful processor definition) +- Provide a type for the state variable (ValueState, ListState, MapState) - depending on the type, the appropriate method on the handle needs to be invoked +- Provide a state encoder for the state variable (in Scala - this can be skipped if implicit encoders are available) +- Provide an optional TTL config for the state variable + +### Types of state variables + +State variables can be of the following types: +- Value State +- List State +- Map State + +Similar to collections for popular programming languages, the state types could be used to model data structures optimized for various types of operations for the underlying storage layer. For example, appends are optimized for ListState and point lookups are optimized for MapState. + +### Providing state encoders + +State encoders are used to serialize and deserialize the state variables. In Scala, the state encoders can be skipped if implicit encoders are available. In Java and Python, the state encoders need to be provided explicitly. +Built-in encoders for primitives, case classes and Java Bean classes are provided by default via the Spark SQL encoders. + +#### Providing implicit encoders in Scala + +In Scala, implicit encoders can be provided for case classes and primitive types. The `implicits` object is provided as part of the `StatefulProcessor` class. Within the StatefulProcessor definition, the user can simply import implicits as `import implicits._` and then they do not require to pass the encoder type explicitly. + +### Providing TTL for state variables + +State variables can be configured with an optional TTL (Time-To-Live) value. The TTL value is used to automatically evict the state variable after the specified duration. The TTL value can be provided as a Duration. + +### Handling input rows + +The `handleInputRows` method is used to process input rows belonging to a grouping key and emit output if needed. The method is invoked by the Spark query engine for each grouping key value received by the operator. If multiple rows belong to the same grouping key, the provided iterator will include all those rows. + +### Handling expired timers + +Within the `handleInputRows` or `handleExpiredTimer` methods, the stateful processor can register timers to be triggered at a later time. The `handleExpiredTimer` method is invoked by the Spark query engine when a timer set by the stateful processor has expired. This method is invoked once for each expired timer. +Here are a few timer properties that are supported: +- Multiple timers associated with the same grouping key can be registered +- The engine provides the ability to list/add/remove timers as needed +- Timers are also checkpointed as part of the query checkpoint and can be triggered on query restart as well. + +### Handling initial state + +The `handleInitialState` method is used to optionally handle the initial state batch dataframe. The initial state batch dataframe is used to pre-populate the state for the stateful processor. The method is invoked by the Spark query engine when the initial state batch dataframe is available. +This method is only called once in the lifetime of the query. This is invoked before any input rows are processed by the stateful processor. + +### Putting it all together + +Here is an example of a StatefulProcessor that implements a downtime detector. Each time a new value is seen for a given key, it updates the lastSeen state value, clears any existing timers, and resets a timer for the future. + +When a timer expires, the application emits the elapsed time since the last observed event for the key. It then sets a new timer to emit an update 10 seconds later. + +<div class="codetabs"> + +<div data-lang="python" markdown="1"> + +{% highlight python %} + +class DownTimeDetector(StatefulProcessor): + def init(self, handle: StatefulProcessorHandle) -> None: + # Define schema for the state value (timestamp) + state_schema = StructType([StructField("value", TimestampType(), True)]) + self.handle = handle + # Initialize state to store the last seen timestamp for each key + self.last_seen = handle.getValueState("last_seen", state_schema) + + def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]: + latest_from_existing = self.last_seen.get() + # Calculate downtime duration + downtime_duration = timerValues.getCurrentProcessingTimeInMs() - int(time.time() * 1000) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org