I see, thanks. I suppose I was trying to avoid so much custom logic, which is 
why I initially was looking at the time-based index functionality. Based on 
what you’ve said so far, I take it using the time-based index to find an offset 
corresponding to a timestamp and then consuming all messages with a smaller 
offset is not a viable solution?

Ray

On 2017-11-22, 12:12 AM, "Matthias J. Sax" <matth...@confluent.io> wrote:

    Using Kafka Streams, it seems reasonable to implement this using
    low-level Processor API with a custom state store.
    
    Thus, you use the `StateStore` interface to implement you state store --
    this allows you to spill to disk if you need to to handle state larger
    than main memory.
    
    If you want to browse some state store examples, you can check out
    RocksDBStore class that implement Kafka Streams' default `StateStore`.
    
    Within your custom `Processor` you can access the state accordingly to
    maintain the window etc.
    
    It's a quite special use case and thus, there is not much out-of-the-box
    support. You can check out some basic examples here:
    https://github.com/confluentinc/kafka-streams-examples
    
    One example implements a custom state store (but only in-memory):
    
https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala
    
    Hope this helps.
    
    
    -Matthias
    
    On 11/21/17 5:53 PM, Ray Ruvinskiy wrote:
    > Thanks for your reply! I am quite inexperienced when it comes to Kafka 
and Kafka Streams and so would appreciate a little more guidance. How would one 
keep messages within a sliding window sorted by timestamp? Would the sort 
operation be done all in memory? I would be dealing potentially with hundreds 
of thousands of messages per partition within every 5 minute interval and so 
was looking for solutions that were not necessary limited by the amount of RAM.
    > 
    > Ray
    > 
    > On 2017-11-21, 5:57 PM, "Matthias J. Sax" <matth...@confluent.io> wrote:
    > 
    >     This is possible, but I think you don't need the time-based index for 
it :)
    >     
    >     You will just buffer up all messages for a 5 minute sliding-window and
    >     maintain all message sorted by timestamp in this window. Each time the
    >     window "moves" you write the oldest records that "drop out" of the
    >     window to the topic. If you get a record with an older timestamp that
    >     allowed, you don't insert in into the window but drop it.
    >     
    >     The timestamp index is useful if you want to seek to a specific offset
    >     base on timestamp. But I don't think you need this for your use case.
    >     
    >     
    >     
    >     -Matthias
    >     
    >     On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
    >     > I’ve been reading 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
 and trying to determine whether I can use the time-based index as an efficient 
way to sort a stream of messages into timestamp (CreateTime) order.
    >     > 
    >     > I am dealing with a number of sources emitting messages that are 
then processed in a distributed fashion and written to a Kafka topic. During 
this processing, the original order of the messages is not strictly maintained. 
Each message has an embedded timestamp. I’d like to be able to sort these 
messages back into timestamp order, allowing for a certain lateness interval, 
before processing them further. For example, supposing the lateness interval is 
5 minutes, at time T I’d like to consume from the topic all messages with 
timestamp up to (T - 5 minutes), in timestamp order. The assumption is that a 
message should be no more than 5 minutes late; if it is more than 5 minutes 
late, it can be discarded. Is this something that can be done with the 
time-based index?
    >     > 
    >     > Thanks,
    >     > 
    >     > Ray
    >     > 
    >     
    >     
    > 
    
    

Reply via email to