Log compaction can also delete keys if the payload for a key is null:

"Compaction also allows from deletes. A message with a key and a null
payload will be treated as a delete from the log. This delete marker
will cause any prior message with that key to be removed (as would any
new message with that key), but delete markers are special in they will
themselves be cleaned out of the log after a period of time. The point
in time at which deletes are no longer retained is given above."

See: https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction

Hope this helps.

-Matthias



On 04/20/2016 08:28 AM, Henry Cai wrote:
> In my case, the key space is unbounded.  The key would be something like
> 'ad_id', this id is auto incrementing all the time.  I understand the
> benefit of using compacted kafka topic for aggregation store, but I don't
> see much benefit of using compaction to replicate records in JoinWindow
> (there are not many duplicates in that window).  Can we specify not to use
> compaction for some state store replication?
> 
> The window expiration policy on pure event time sounds risky, one
> out-of-order record will drop still active windows.  We probably need a
> policy to depend on both stream time and event time.
> 
> I can fire JIRAs for these two.  For the issue of controlling compaction
> time, I am not sure how to word the details, I will leave this up to you.
> 
> 
> On Tue, Apr 19, 2016 at 6:19 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hi Henry,
>>
>> 1) Yes, if your key space is unlimited. But in practice, for KTable streams
>> where the record key (i.e. the primary key of the "table") is usually a
>> client-id, service-id, etc, the key space is usually bounded, for example
>> by the population of the globe, where in this case it should still be OK to
>> host with parallel Kafka Streams instances :)
>>
>> 2) It is currently based on the record event time. More specifically,
>> currently say you have a new Window instance created at T0 with maintenance
>> interval 10, then the first time we received a record with timestamp T10,
>> we will drop the window. I think this semantics can be improved to "stream
>> time", which is less vulnerable to early out-of-ordering records.
>>
>>
>> Do you want to create JIRAs for those issues I mentioned in the previous
>> emails to keep track?
>>
>>
>> Guozhang
>>
>>
>> On Tue, Apr 19, 2016 at 2:29 PM, Henry Cai <h...@pinterest.com.invalid>
>> wrote:
>>
>>> I have another follow-up question on the compacted kafka topic for
>> RocksDB
>>> replication.
>>>
>>> 1. From Kafka compaction implementation, looks like all keys from the
>> past
>>> for that topic will be preserved, (the compaction/cleaner will only
>> delete
>>> the records which has same-key occurrences later in the queue).  If
>> that's
>>> the case, will we run out of disk space on kafka broker side for those
>>> compacted topics if we keep the stream application runs too long?
>>>
>>> 2. For the various windows stored in RocksDB, when we do trigger the
>>> removal/expiration of those window and keys from RocksDB?
>>>
>>>
>>> On Tue, Apr 19, 2016 at 12:27 PM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>>> 1) It sounds your should be using KTable.outerjoin(KTable) with your
>>> case,
>>>> but keep in mind that currently we are still working on exactly-once
>>>> semantics, and hence currently the results may be ordering dependent.
>>>>
>>>> We do not support windowing in KTable since itself is an ever-updating
>>>> changlog already, and hence its join result would also be a ever
>> updating
>>>> changelog stream as KTable. Reading data from KTable where values with
>>> the
>>>> same key may not yet been compacted as fine, as long as the operation
>>>> itself is preserving :
>>>>
>>>> F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3},
>> {key:
>>> b,
>>>> value: 4}
>>>>
>>>> Here the resulted key values may be different, but the same key input
>>> will
>>>> generate the same key output. Then they are still changelog records for
>>> the
>>>> same key. All built-in KTable operators preserve this property. On the
>>>> other hand, if:
>>>>
>>>> F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3},
>> {key:
>>> c,
>>>> value: 4}
>>>>
>>>> The it is not key-preserving, and then you may encounter some
>> unexpected
>>>> behavior.
>>>>
>>>>
>>>> 2) log compaction is a Kafka broker feature that Kafka Streams leverage
>>> on:
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction
>>>>
>>>> It is done on disk files that are not active (i.e. no longer takes
>>>> appends).
>>>>
>>>> We are working on exposing the configs for log compactions such as
>>>> compaction intervals and thresholds in Kafka Streams so that users can
>>>> control its behavior. Actually, Henry do you mind creating a JIRA for
>>> this
>>>> purpose and list what you would like to control log compaction?
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Apr 19, 2016 at 10:02 AM, Henry Cai <h...@pinterest.com.invalid
>>>
>>>> wrote:
>>>>
>>>>> Related to the log compaction question: " it will be log
>>>>> compacted on the key over time", how do we control the time for log
>>>>> compaction?  For the log compaction implementation, is the storage
>> used
>>>> to
>>>>> map a new value for a given key stored in memory or on disk?
>>>>>
>>>>> On Tue, Apr 19, 2016 at 8:58 AM, Guillermo Lammers Corral <
>>>>> guillermo.lammers.cor...@tecsisa.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Thanks again for your reply :)
>>>>>>
>>>>>> 1) In my example when I send a record from outer table and there is
>>> no
>>>>>> matching record from inner table I receive data to the output topic
>>> and
>>>>>> vice versa. I am trying it with the topics empties at the first
>>>>> execution.
>>>>>> How is possible?
>>>>>>
>>>>>> Why KTable joins does not support windowing strategies? I think
>> that
>>>> for
>>>>>> this use cases I need it, what do you think?
>>>>>>
>>>>>> 2) What does it means? Although the log may not be yet compacted,
>>> there
>>>>>> should be no problem to read from them and execute a new stream
>>>> process,
>>>>>> right? (like a new joins, counts...).
>>>>>>
>>>>>> Thanks!!
>>>>>>
>>>>>> 2016-04-15 17:37 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
>>>>>>
>>>>>>> 1) There are three types of joins for KTable-KTable join, the
>>> follow
>>>>> the
>>>>>>> same semantics in SQL joins:
>>>>>>>
>>>>>>> KTable.join(KTable): when there is no matching record from inner
>>>> table
>>>>>> when
>>>>>>> received a new record from outer table, no output; and vice
>> versa.
>>>>>>> KTable.leftjoin(KTable): when there is no matching record from
>>> inner
>>>>>> table
>>>>>>> when received a new record from outer table, output (a, null); on
>>> the
>>>>>> other
>>>>>>> direction no output.
>>>>>>> KTable.outerjoin(KTable): when there is no matching record from
>>>> inner /
>>>>>>> outer table when received a new record from outer / inner table,
>>>> output
>>>>>> (a,
>>>>>>> null) or (null, b).
>>>>>>>
>>>>>>>
>>>>>>> 2) The result topic is also a changelog topic, although it will
>> be
>>>> log
>>>>>>> compacted on the key over time, if you consume immediately the
>> log
>>>> may
>>>>>> not
>>>>>>> be yet compacted.
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>> On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral <
>>>>>>> guillermo.lammers.cor...@tecsisa.com> wrote:
>>>>>>>
>>>>>>>> Hi Guozhang,
>>>>>>>>
>>>>>>>> Thank you very much for your reply and sorry for the generic
>>>>> question,
>>>>>>> I'll
>>>>>>>> try to explain with some pseudocode.
>>>>>>>>
>>>>>>>> I have two KTable with a join:
>>>>>>>>
>>>>>>>> ktable1: KTable[String, String] = builder.table("topic1")
>>>>>>>> ktable2: KTable[String, String] = builder.table("topic2")
>>>>>>>>
>>>>>>>> result: KTable[String, ResultUnion] =
>>>>>>>> ktable1.join(ktable2, (data1, data2) => new ResultUnion(data1,
>>>>> data2))
>>>>>>>>
>>>>>>>> I send the result to a topic result.to("resultTopic").
>>>>>>>>
>>>>>>>> My questions are related with the following scenario:
>>>>>>>>
>>>>>>>> - The streming is up & running without data in topics
>>>>>>>>
>>>>>>>> - I send data to "topic2", for example a key/value like that
>>>>>>> ("uniqueKey1",
>>>>>>>> "hello")
>>>>>>>>
>>>>>>>> - I see null values in topic "resultTopic", i.e. ("uniqueKey1",
>>>> null)
>>>>>>>>
>>>>>>>> - If I send data to "topic1", for example a key/value like that
>>>>>>>> ("uniqueKey1", "world") then I see this values in topic
>>>>> "resultTopic",
>>>>>>>> ("uniqueKey1", ResultUnion("hello", "world"))
>>>>>>>>
>>>>>>>> Q: If we send data for one of the KTable that does not have the
>>>>>>>> corresponding data by key in the other one, obtain null values
>> in
>>>> the
>>>>>>>> result final topic is the expected behavior?
>>>>>>>>
>>>>>>>> My next step would be use Kafka Connect to persist result data
>> in
>>>> C*
>>>>> (I
>>>>>>>> have not read yet the Connector docs...), is this the way to do
>>> it?
>>>>> (I
>>>>>>> mean
>>>>>>>> prepare the data in the topic).
>>>>>>>>
>>>>>>>> Q: On the other hand, just to try, I have a KTable that read
>>>> messages
>>>>>> in
>>>>>>>> "resultTopic" and prints them. If the stream is a KTable I am
>>>>> wondering
>>>>>>> why
>>>>>>>> is getting all the values from the topic even those with the
>> same
>>>>> key?
>>>>>>>>
>>>>>>>> Thanks in advance! Great job answering community!
>>>>>>>>
>>>>>>>> 2016-04-14 20:00 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi Guillermo,
>>>>>>>>>
>>>>>>>>> 1) Yes in your case, the streams are really a "changelog"
>>> stream,
>>>>>> hence
>>>>>>>> you
>>>>>>>>> should create the stream as KTable, and do KTable-KTable
>> join.
>>>>>>>>>
>>>>>>>>> 2) Could elaborate about "achieving this"? What behavior do
>>>> require
>>>>>> in
>>>>>>>> the
>>>>>>>>> application logic?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers Corral <
>>>>>>>>> guillermo.lammers.cor...@tecsisa.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I am a newbie to Kafka Streams and I am using it trying to
>>>> solve
>>>>> a
>>>>>>>>>> particular use case. Let me explain.
>>>>>>>>>>
>>>>>>>>>> I have two sources of data both like that:
>>>>>>>>>>
>>>>>>>>>> Key (string)
>>>>>>>>>> DateTime (hourly granularity)
>>>>>>>>>> Value
>>>>>>>>>>
>>>>>>>>>> I need to join the two sources by key and date (hour of
>> day)
>>> to
>>>>>>> obtain:
>>>>>>>>>>
>>>>>>>>>> Key (string)
>>>>>>>>>> DateTime (hourly granularity)
>>>>>>>>>> ValueSource1
>>>>>>>>>> ValueSource2
>>>>>>>>>>
>>>>>>>>>> I think that first I'd need to push the messages in Kafka
>>>> topics
>>>>>> with
>>>>>>>> the
>>>>>>>>>> date as part of the key because I'll group by key taking
>> into
>>>>>> account
>>>>>>>> the
>>>>>>>>>> date. So maybe the key must be a new string like
>>> key_timestamp.
>>>>>> But,
>>>>>>> of
>>>>>>>>>> course, it is not the main problem, is just an additional
>>>>>>> explanation.
>>>>>>>>>>
>>>>>>>>>> Ok, so data are in topics, here we go!
>>>>>>>>>>
>>>>>>>>>> - Multiple records allows per key but only the latest value
>>>> for a
>>>>>>>> record
>>>>>>>>>> key will be considered. I should use two KTable with some
>>> join
>>>>>>>> strategy,
>>>>>>>>>> right?
>>>>>>>>>>
>>>>>>>>>> - Data of both sources could arrive at any time. What can I
>>> do
>>>> to
>>>>>>>> achieve
>>>>>>>>>> this?
>>>>>>>>>>
>>>>>>>>>> Thanks in advance.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to