HeartSaVioR commented on code in PR #39538: URL: https://github.com/apache/spark/pull/39538#discussion_r1068980633
########## docs/structured-streaming-programming-guide.md: ########## @@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of them: structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. +# Asynchronous Progress Tracking Review Comment: nit: one empty line ########## docs/structured-streaming-programming-guide.md: ########## @@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of them: structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. +# Asynchronous Progress Tracking + +## What is it? + +Asynchronous progress tracking allows Structured Streaming pipelines to checkpoint progress asynchronously and in parallel to the actual data processing within a micro-batch, reducing latency associated with maintaining the offsetLog and commitLog. + +![Async Progress Tracking](img/async-progress.png) + +## How does it work? + +Structured Streaming relies on persisting and managing offsets as progress indicators for query processing. Offset management operation directly impacts processing latency, because no data processing can occur until these operations are complete. Asynchronous progress tracking enables Structured Streaming pipelines to checkpoint progress without being impacted by these offset management operations. + +## How to use it? + +The code snippet below provides an example of how to use this feature: +```scala +val stream = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "in") + .load() +val query = stream.writeStream + .format("kafka") + .option("topic", "out") + .option("checkpointLocation", "/tmp/checkpoint") + .option("asyncProgressTrackingEnabled", "true") + .start() +``` + +The table below describes the configurations for this feature and default values associated with them. + +| Option | Value | Default | Description | +|-------------|-----------------|------------|---------------------| +|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous progress tracking| +|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit offsets and completion commits| + +## Limitations +The initial version of the feature has the following limitations: + +* Asynchronous progress tracking is only supported in stateless pipelines using Kafka Sink +* Exactly once end-to-end processing will not be supported with this asynchronous progress tracking because offset ranges for batch can be changed in case of failure. Though many sinks, such as Kafka sink, do not support writing exactly once anyways. + +## Switching the setting off +Turning the async progress tracking off my cause the following exception to be thrown Review Comment: nit: my -> may ########## docs/structured-streaming-programming-guide.md: ########## @@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of them: structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. +# Asynchronous Progress Tracking + +## What is it? + +Asynchronous progress tracking allows Structured Streaming pipelines to checkpoint progress asynchronously and in parallel to the actual data processing within a micro-batch, reducing latency associated with maintaining the offsetLog and commitLog. + +![Async Progress Tracking](img/async-progress.png) + +## How does it work? + +Structured Streaming relies on persisting and managing offsets as progress indicators for query processing. Offset management operation directly impacts processing latency, because no data processing can occur until these operations are complete. Asynchronous progress tracking enables Structured Streaming pipelines to checkpoint progress without being impacted by these offset management operations. Review Comment: same: `pipelines` -> `queries`, maybe `Structured Streaming pipelines` -> `streaming queries` ########## docs/structured-streaming-programming-guide.md: ########## @@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of them: structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. +# Asynchronous Progress Tracking + +## What is it? + +Asynchronous progress tracking allows Structured Streaming pipelines to checkpoint progress asynchronously and in parallel to the actual data processing within a micro-batch, reducing latency associated with maintaining the offsetLog and commitLog. + +![Async Progress Tracking](img/async-progress.png) + +## How does it work? + +Structured Streaming relies on persisting and managing offsets as progress indicators for query processing. Offset management operation directly impacts processing latency, because no data processing can occur until these operations are complete. Asynchronous progress tracking enables Structured Streaming pipelines to checkpoint progress without being impacted by these offset management operations. + +## How to use it? + +The code snippet below provides an example of how to use this feature: +```scala +val stream = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "in") + .load() +val query = stream.writeStream + .format("kafka") + .option("topic", "out") + .option("checkpointLocation", "/tmp/checkpoint") + .option("asyncProgressTrackingEnabled", "true") + .start() +``` + +The table below describes the configurations for this feature and default values associated with them. + +| Option | Value | Default | Description | +|-------------|-----------------|------------|---------------------| +|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous progress tracking| +|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit offsets and completion commits| + +## Limitations +The initial version of the feature has the following limitations: + +* Asynchronous progress tracking is only supported in stateless pipelines using Kafka Sink +* Exactly once end-to-end processing will not be supported with this asynchronous progress tracking because offset ranges for batch can be changed in case of failure. Though many sinks, such as Kafka sink, do not support writing exactly once anyways. Review Comment: nit: no double spaces. ########## docs/structured-streaming-programming-guide.md: ########## @@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of them: structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. +# Asynchronous Progress Tracking + +## What is it? + +Asynchronous progress tracking allows Structured Streaming pipelines to checkpoint progress asynchronously and in parallel to the actual data processing within a micro-batch, reducing latency associated with maintaining the offsetLog and commitLog. Review Comment: nit: offsetLog -> offset log, commitLog -> commit log. ########## docs/structured-streaming-programming-guide.md: ########## @@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of them: structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. +# Asynchronous Progress Tracking + +## What is it? + +Asynchronous progress tracking allows Structured Streaming pipelines to checkpoint progress asynchronously and in parallel to the actual data processing within a micro-batch, reducing latency associated with maintaining the offsetLog and commitLog. Review Comment: nit: pipelines -> queries, maybe `Structured Streaming pipelines` -> `streaming queries` ########## docs/structured-streaming-programming-guide.md: ########## @@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of them: structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. +# Asynchronous Progress Tracking + +## What is it? + +Asynchronous progress tracking allows Structured Streaming pipelines to checkpoint progress asynchronously and in parallel to the actual data processing within a micro-batch, reducing latency associated with maintaining the offsetLog and commitLog. + +![Async Progress Tracking](img/async-progress.png) + +## How does it work? + +Structured Streaming relies on persisting and managing offsets as progress indicators for query processing. Offset management operation directly impacts processing latency, because no data processing can occur until these operations are complete. Asynchronous progress tracking enables Structured Streaming pipelines to checkpoint progress without being impacted by these offset management operations. + +## How to use it? + +The code snippet below provides an example of how to use this feature: +```scala +val stream = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "in") + .load() +val query = stream.writeStream + .format("kafka") + .option("topic", "out") + .option("checkpointLocation", "/tmp/checkpoint") + .option("asyncProgressTrackingEnabled", "true") + .start() +``` + +The table below describes the configurations for this feature and default values associated with them. + +| Option | Value | Default | Description | +|-------------|-----------------|------------|---------------------| +|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous progress tracking| +|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit offsets and completion commits| + +## Limitations +The initial version of the feature has the following limitations: + +* Asynchronous progress tracking is only supported in stateless pipelines using Kafka Sink Review Comment: nit: stateless query, no double spaces ########## docs/structured-streaming-programming-guide.md: ########## @@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of them: structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully. +# Asynchronous Progress Tracking + +## What is it? + +Asynchronous progress tracking allows Structured Streaming pipelines to checkpoint progress asynchronously and in parallel to the actual data processing within a micro-batch, reducing latency associated with maintaining the offsetLog and commitLog. + +![Async Progress Tracking](img/async-progress.png) + +## How does it work? + +Structured Streaming relies on persisting and managing offsets as progress indicators for query processing. Offset management operation directly impacts processing latency, because no data processing can occur until these operations are complete. Asynchronous progress tracking enables Structured Streaming pipelines to checkpoint progress without being impacted by these offset management operations. + +## How to use it? + +The code snippet below provides an example of how to use this feature: +```scala +val stream = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "in") + .load() +val query = stream.writeStream + .format("kafka") + .option("topic", "out") + .option("checkpointLocation", "/tmp/checkpoint") + .option("asyncProgressTrackingEnabled", "true") + .start() +``` + +The table below describes the configurations for this feature and default values associated with them. + +| Option | Value | Default | Description | +|-------------|-----------------|------------|---------------------| +|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous progress tracking| +|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit offsets and completion commits| + +## Limitations +The initial version of the feature has the following limitations: + +* Asynchronous progress tracking is only supported in stateless pipelines using Kafka Sink +* Exactly once end-to-end processing will not be supported with this asynchronous progress tracking because offset ranges for batch can be changed in case of failure. Though many sinks, such as Kafka sink, do not support writing exactly once anyways. + +## Switching the setting off +Turning the async progress tracking off my cause the following exception to be thrown + +```scala +java.lang.IllegalStateException: batch x doesn't exist +``` + + +Also the following error message may be printed in the driver logs: + +``` +The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log. +``` +This is caused by the fact that when async progress tracking is enabled, the framework will not checkpoint progress for every batch as would be done if async progress tracking is not used. To solve this problem simply re-enable “asyncProgressTrackingEnabled” and set “asyncProgressCheckpointingInterval” to 0 and run the streaming query until at least two micro-batches have been processed. Async progress tracking can be now safely disabled and restarting query should proceed normally. Review Comment: nit: no double spaces. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org