[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584350#comment-16584350
 ] 

ASF GitHub Bot commented on KAFKA-3514:
---------------------------------------

mjsax closed pull request #5458: KAFKA-3514, Documentations: Add out of 
ordering in concepts.
URL: https://github.com/apache/kafka/pull/5458
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html
index 3f9eab57ecd..e84d3d362a1 100644
--- a/docs/streams/core-concepts.html
+++ b/docs/streams/core-concepts.html
@@ -172,6 +172,37 @@ <h2><a id="streams_processing_guarantee" 
href="#streams_processing_guarantee">Pr
         More details can be found in the <a 
href="/{{version}}/documentation#streamsconfigs"><b>Kafka Streams 
Configs</b></a> section.
     </p>
 
+    <h3><a id="streams_out_of_ordering" 
href="#streams_out_of_ordering">Out-of-Order Handling</a></h3>
+
+    <p>
+        Besides the guarantee that each record will be processed exactly-once, 
another issue that many stream processing application will face is how to
+        handle <a href="tbd">out-of-order data</a> that may impact their 
business logic. In Kafka Streams, there are two causes that could potentially
+        result in out-of-order data arrivals with respect to their timestamps:
+    </p>
+
+    <ul>
+        <li> Within a topic-partition, a record's timestamp may not be 
monotonically increasing along with their offsets. Since Kafka Streams will 
always try to process records within a topic-partition to follow the offset 
order, it can cause records with larger timestamps
+             to be processed earlier than records with smaller timestamp in 
the same topic-partition.
+        </li>
+        <li> Within a <a 
href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">stream
 task</a> that may be processing multiple topic-partitions, if users configure 
the application to not wait for all partitions to contain some buffered data and
+             pick from the partition with the smallest timestamp to process 
the next record, then later on when some records are fetched for other 
topic-partitions, their timestamps may be smaller than those processed records 
fetched from another topic-partition.
+        </li>
+    </ul>
+
+    <p>
+        For stateless operations, out-of-order data will not impact processing 
logic since only one record is considered at a time, without looking into the 
history of past processed records;
+        for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
+        to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
+        In Kafka Streams specifically, users can configure their window 
operators for aggregations to achieve such trade-offs (details can be found in 
<a href="/{{version}}/documentation/streams/developer-guide"><b>Developer 
Guide</b></a>).
+        As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+    </p>
+
+    <ul>
+        <li> For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. </li>
+        <li> For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. </li>
+        <li> For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventual consistent. </li>
+    </ul>
+
     <div class="pagination">
         <a href="/{{version}}/documentation/streams/tutorial" 
class="pagination__btn pagination__btn__prev">Previous</a>
         <a href="/{{version}}/documentation/streams/architecture" 
class="pagination__btn pagination__btn__next">Next</a>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream timestamp computation needs some further thoughts
> --------------------------------------------------------
>
>                 Key: KAFKA-3514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>              Labels: architecture
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to