HeartSaVioR commented on code in PR #39538:
URL: https://github.com/apache/spark/pull/39538#discussion_r1068980633


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of 
them:
     structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
+# Asynchronous Progress Tracking

Review Comment:
   nit: one empty line



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of 
them:
     structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
+# Asynchronous Progress Tracking
+  
+## What is it?
+
+Asynchronous progress tracking allows Structured Streaming pipelines to 
checkpoint progress asynchronously and in parallel to the actual data 
processing within a micro-batch, reducing latency associated with maintaining 
the offsetLog and commitLog.
+
+![Async Progress Tracking](img/async-progress.png)
+
+## How does it work?
+
+Structured Streaming relies on persisting and managing offsets as progress 
indicators for query processing. Offset management operation directly impacts 
processing latency, because no data processing can occur until these operations 
are complete. Asynchronous progress tracking enables Structured Streaming 
pipelines to checkpoint progress without being impacted by these offset 
management operations.
+
+## How to use it?
+
+The code snippet below provides an example of how to use this feature:
+```scala
+val stream = spark.readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+      .option("subscribe", "in")
+      .load()
+val query = stream.writeStream
+     .format("kafka")
+       .option("topic", "out")
+     .option("checkpointLocation", "/tmp/checkpoint")
+       .option("asyncProgressTrackingEnabled", "true")
+     .start()
+```
+
+The table below describes the configurations for this feature and default 
values associated with them.
+
+| Option    | Value           | Default | Description       | 
+|-------------|-----------------|------------|---------------------|
+|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous 
progress tracking|
+|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit 
offsets and completion commits|
+
+## Limitations
+The initial version of the feature has the following limitations:
+
+* Asynchronous progress tracking is only supported in  stateless pipelines 
using Kafka Sink
+* Exactly once end-to-end processing will not be supported with this 
asynchronous progress tracking  because offset ranges for batch can be changed 
in case of failure. Though many sinks, such as Kafka sink, do not support 
writing exactly once anyways.
+
+## Switching the setting off
+Turning the async progress tracking off my cause the following exception to be 
thrown

Review Comment:
   nit: my -> may



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of 
them:
     structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
+# Asynchronous Progress Tracking
+  
+## What is it?
+
+Asynchronous progress tracking allows Structured Streaming pipelines to 
checkpoint progress asynchronously and in parallel to the actual data 
processing within a micro-batch, reducing latency associated with maintaining 
the offsetLog and commitLog.
+
+![Async Progress Tracking](img/async-progress.png)
+
+## How does it work?
+
+Structured Streaming relies on persisting and managing offsets as progress 
indicators for query processing. Offset management operation directly impacts 
processing latency, because no data processing can occur until these operations 
are complete. Asynchronous progress tracking enables Structured Streaming 
pipelines to checkpoint progress without being impacted by these offset 
management operations.

Review Comment:
   same: `pipelines` -> `queries`, maybe `Structured Streaming pipelines` -> 
`streaming queries`



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of 
them:
     structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
+# Asynchronous Progress Tracking
+  
+## What is it?
+
+Asynchronous progress tracking allows Structured Streaming pipelines to 
checkpoint progress asynchronously and in parallel to the actual data 
processing within a micro-batch, reducing latency associated with maintaining 
the offsetLog and commitLog.
+
+![Async Progress Tracking](img/async-progress.png)
+
+## How does it work?
+
+Structured Streaming relies on persisting and managing offsets as progress 
indicators for query processing. Offset management operation directly impacts 
processing latency, because no data processing can occur until these operations 
are complete. Asynchronous progress tracking enables Structured Streaming 
pipelines to checkpoint progress without being impacted by these offset 
management operations.
+
+## How to use it?
+
+The code snippet below provides an example of how to use this feature:
+```scala
+val stream = spark.readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+      .option("subscribe", "in")
+      .load()
+val query = stream.writeStream
+     .format("kafka")
+       .option("topic", "out")
+     .option("checkpointLocation", "/tmp/checkpoint")
+       .option("asyncProgressTrackingEnabled", "true")
+     .start()
+```
+
+The table below describes the configurations for this feature and default 
values associated with them.
+
+| Option    | Value           | Default | Description       | 
+|-------------|-----------------|------------|---------------------|
+|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous 
progress tracking|
+|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit 
offsets and completion commits|
+
+## Limitations
+The initial version of the feature has the following limitations:
+
+* Asynchronous progress tracking is only supported in  stateless pipelines 
using Kafka Sink
+* Exactly once end-to-end processing will not be supported with this 
asynchronous progress tracking  because offset ranges for batch can be changed 
in case of failure. Though many sinks, such as Kafka sink, do not support 
writing exactly once anyways.

Review Comment:
   nit: no double spaces.



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of 
them:
     structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
+# Asynchronous Progress Tracking
+  
+## What is it?
+
+Asynchronous progress tracking allows Structured Streaming pipelines to 
checkpoint progress asynchronously and in parallel to the actual data 
processing within a micro-batch, reducing latency associated with maintaining 
the offsetLog and commitLog.

Review Comment:
   nit: offsetLog -> offset log, commitLog -> commit log.



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of 
them:
     structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
+# Asynchronous Progress Tracking
+  
+## What is it?
+
+Asynchronous progress tracking allows Structured Streaming pipelines to 
checkpoint progress asynchronously and in parallel to the actual data 
processing within a micro-batch, reducing latency associated with maintaining 
the offsetLog and commitLog.

Review Comment:
   nit: pipelines -> queries, maybe `Structured Streaming pipelines` -> 
`streaming queries`



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of 
them:
     structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
+# Asynchronous Progress Tracking
+  
+## What is it?
+
+Asynchronous progress tracking allows Structured Streaming pipelines to 
checkpoint progress asynchronously and in parallel to the actual data 
processing within a micro-batch, reducing latency associated with maintaining 
the offsetLog and commitLog.
+
+![Async Progress Tracking](img/async-progress.png)
+
+## How does it work?
+
+Structured Streaming relies on persisting and managing offsets as progress 
indicators for query processing. Offset management operation directly impacts 
processing latency, because no data processing can occur until these operations 
are complete. Asynchronous progress tracking enables Structured Streaming 
pipelines to checkpoint progress without being impacted by these offset 
management operations.
+
+## How to use it?
+
+The code snippet below provides an example of how to use this feature:
+```scala
+val stream = spark.readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+      .option("subscribe", "in")
+      .load()
+val query = stream.writeStream
+     .format("kafka")
+       .option("topic", "out")
+     .option("checkpointLocation", "/tmp/checkpoint")
+       .option("asyncProgressTrackingEnabled", "true")
+     .start()
+```
+
+The table below describes the configurations for this feature and default 
values associated with them.
+
+| Option    | Value           | Default | Description       | 
+|-------------|-----------------|------------|---------------------|
+|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous 
progress tracking|
+|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit 
offsets and completion commits|
+
+## Limitations
+The initial version of the feature has the following limitations:
+
+* Asynchronous progress tracking is only supported in  stateless pipelines 
using Kafka Sink

Review Comment:
   nit: stateless query, no double spaces



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -3569,7 +3569,63 @@ the effect of the change is not well-defined. For all of 
them:
     structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
+# Asynchronous Progress Tracking
+  
+## What is it?
+
+Asynchronous progress tracking allows Structured Streaming pipelines to 
checkpoint progress asynchronously and in parallel to the actual data 
processing within a micro-batch, reducing latency associated with maintaining 
the offsetLog and commitLog.
+
+![Async Progress Tracking](img/async-progress.png)
+
+## How does it work?
+
+Structured Streaming relies on persisting and managing offsets as progress 
indicators for query processing. Offset management operation directly impacts 
processing latency, because no data processing can occur until these operations 
are complete. Asynchronous progress tracking enables Structured Streaming 
pipelines to checkpoint progress without being impacted by these offset 
management operations.
+
+## How to use it?
+
+The code snippet below provides an example of how to use this feature:
+```scala
+val stream = spark.readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+      .option("subscribe", "in")
+      .load()
+val query = stream.writeStream
+     .format("kafka")
+       .option("topic", "out")
+     .option("checkpointLocation", "/tmp/checkpoint")
+       .option("asyncProgressTrackingEnabled", "true")
+     .start()
+```
+
+The table below describes the configurations for this feature and default 
values associated with them.
+
+| Option    | Value           | Default | Description       | 
+|-------------|-----------------|------------|---------------------|
+|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous 
progress tracking|
+|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit 
offsets and completion commits|
+
+## Limitations
+The initial version of the feature has the following limitations:
+
+* Asynchronous progress tracking is only supported in  stateless pipelines 
using Kafka Sink
+* Exactly once end-to-end processing will not be supported with this 
asynchronous progress tracking  because offset ranges for batch can be changed 
in case of failure. Though many sinks, such as Kafka sink, do not support 
writing exactly once anyways.
+
+## Switching the setting off
+Turning the async progress tracking off my cause the following exception to be 
thrown
+
+```scala
+java.lang.IllegalStateException: batch x doesn't exist
+```
+
+
+Also the following error message may be printed in the driver logs:
+
+```
+The offset log for batch x doesn't exist, which is required to restart the 
query from the latest batch x from the offset log. Please ensure there are two 
subsequent offset logs available for the latest batch via manually deleting the 
offset file(s). Please also ensure the latest batch for commit log is equal or 
one batch earlier than the latest batch for offset log.
+```
 
+This is caused by the fact that when async progress tracking is enabled, the 
framework will not checkpoint progress for every batch as would be done if 
async progress tracking is not used.  To solve this problem simply re-enable 
“asyncProgressTrackingEnabled” and set “asyncProgressCheckpointingInterval” to 
0 and run the streaming query until at least two micro-batches have been 
processed.  Async progress tracking can be now safely disabled and restarting 
query should proceed normally.

Review Comment:
   nit: no double spaces.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to