Created two Kafka JIRAs:

KAFKA-3595: Add capability to specify replication compact option for stream
store

KAFKA-3596: Kafka Streams: Window expiration needs to consider more than
event time

On Wed, Apr 20, 2016 at 11:43 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Henry,
>
> Yes for joining windows the key is actually a combo of {join window, key,
> sequenceID} and hence all records are unique, we do not need log compaction
> for its changelogs.
>
> Guozhang
>
>
> On Tue, Apr 19, 2016 at 11:28 PM, Henry Cai <h...@pinterest.com.invalid>
> wrote:
>
> > In my case, the key space is unbounded.  The key would be something like
> > 'ad_id', this id is auto incrementing all the time.  I understand the
> > benefit of using compacted kafka topic for aggregation store, but I don't
> > see much benefit of using compaction to replicate records in JoinWindow
> > (there are not many duplicates in that window).  Can we specify not to
> use
> > compaction for some state store replication?
> >
> > The window expiration policy on pure event time sounds risky, one
> > out-of-order record will drop still active windows.  We probably need a
> > policy to depend on both stream time and event time.
> >
> > I can fire JIRAs for these two.  For the issue of controlling compaction
> > time, I am not sure how to word the details, I will leave this up to you.
> >
> >
> > On Tue, Apr 19, 2016 at 6:19 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi Henry,
> > >
> > > 1) Yes, if your key space is unlimited. But in practice, for KTable
> > streams
> > > where the record key (i.e. the primary key of the "table") is usually a
> > > client-id, service-id, etc, the key space is usually bounded, for
> example
> > > by the population of the globe, where in this case it should still be
> OK
> > to
> > > host with parallel Kafka Streams instances :)
> > >
> > > 2) It is currently based on the record event time. More specifically,
> > > currently say you have a new Window instance created at T0 with
> > maintenance
> > > interval 10, then the first time we received a record with timestamp
> T10,
> > > we will drop the window. I think this semantics can be improved to
> > "stream
> > > time", which is less vulnerable to early out-of-ordering records.
> > >
> > >
> > > Do you want to create JIRAs for those issues I mentioned in the
> previous
> > > emails to keep track?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Apr 19, 2016 at 2:29 PM, Henry Cai <h...@pinterest.com.invalid
> >
> > > wrote:
> > >
> > > > I have another follow-up question on the compacted kafka topic for
> > > RocksDB
> > > > replication.
> > > >
> > > > 1. From Kafka compaction implementation, looks like all keys from the
> > > past
> > > > for that topic will be preserved, (the compaction/cleaner will only
> > > delete
> > > > the records which has same-key occurrences later in the queue).  If
> > > that's
> > > > the case, will we run out of disk space on kafka broker side for
> those
> > > > compacted topics if we keep the stream application runs too long?
> > > >
> > > > 2. For the various windows stored in RocksDB, when we do trigger the
> > > > removal/expiration of those window and keys from RocksDB?
> > > >
> > > >
> > > > On Tue, Apr 19, 2016 at 12:27 PM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > >
> > > > > 1) It sounds your should be using KTable.outerjoin(KTable) with
> your
> > > > case,
> > > > > but keep in mind that currently we are still working on
> exactly-once
> > > > > semantics, and hence currently the results may be ordering
> dependent.
> > > > >
> > > > > We do not support windowing in KTable since itself is an
> > ever-updating
> > > > > changlog already, and hence its join result would also be a ever
> > > updating
> > > > > changelog stream as KTable. Reading data from KTable where values
> > with
> > > > the
> > > > > same key may not yet been compacted as fine, as long as the
> operation
> > > > > itself is preserving :
> > > > >
> > > > > F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3},
> > > {key:
> > > > b,
> > > > > value: 4}
> > > > >
> > > > > Here the resulted key values may be different, but the same key
> input
> > > > will
> > > > > generate the same key output. Then they are still changelog records
> > for
> > > > the
> > > > > same key. All built-in KTable operators preserve this property. On
> > the
> > > > > other hand, if:
> > > > >
> > > > > F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3},
> > > {key:
> > > > c,
> > > > > value: 4}
> > > > >
> > > > > The it is not key-preserving, and then you may encounter some
> > > unexpected
> > > > > behavior.
> > > > >
> > > > >
> > > > > 2) log compaction is a Kafka broker feature that Kafka Streams
> > leverage
> > > > on:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction
> > > > >
> > > > > It is done on disk files that are not active (i.e. no longer takes
> > > > > appends).
> > > > >
> > > > > We are working on exposing the configs for log compactions such as
> > > > > compaction intervals and thresholds in Kafka Streams so that users
> > can
> > > > > control its behavior. Actually, Henry do you mind creating a JIRA
> for
> > > > this
> > > > > purpose and list what you would like to control log compaction?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Apr 19, 2016 at 10:02 AM, Henry Cai
> > <h...@pinterest.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > Related to the log compaction question: " it will be log
> > > > > > compacted on the key over time", how do we control the time for
> log
> > > > > > compaction?  For the log compaction implementation, is the
> storage
> > > used
> > > > > to
> > > > > > map a new value for a given key stored in memory or on disk?
> > > > > >
> > > > > > On Tue, Apr 19, 2016 at 8:58 AM, Guillermo Lammers Corral <
> > > > > > guillermo.lammers.cor...@tecsisa.com> wrote:
> > > > > >
> > > > > > > Hello,
> > > > > > >
> > > > > > > Thanks again for your reply :)
> > > > > > >
> > > > > > > 1) In my example when I send a record from outer table and
> there
> > is
> > > > no
> > > > > > > matching record from inner table I receive data to the output
> > topic
> > > > and
> > > > > > > vice versa. I am trying it with the topics empties at the first
> > > > > > execution.
> > > > > > > How is possible?
> > > > > > >
> > > > > > > Why KTable joins does not support windowing strategies? I think
> > > that
> > > > > for
> > > > > > > this use cases I need it, what do you think?
> > > > > > >
> > > > > > > 2) What does it means? Although the log may not be yet
> compacted,
> > > > there
> > > > > > > should be no problem to read from them and execute a new stream
> > > > > process,
> > > > > > > right? (like a new joins, counts...).
> > > > > > >
> > > > > > > Thanks!!
> > > > > > >
> > > > > > > 2016-04-15 17:37 GMT+02:00 Guozhang Wang <wangg...@gmail.com>:
> > > > > > >
> > > > > > > > 1) There are three types of joins for KTable-KTable join, the
> > > > follow
> > > > > > the
> > > > > > > > same semantics in SQL joins:
> > > > > > > >
> > > > > > > > KTable.join(KTable): when there is no matching record from
> > inner
> > > > > table
> > > > > > > when
> > > > > > > > received a new record from outer table, no output; and vice
> > > versa.
> > > > > > > > KTable.leftjoin(KTable): when there is no matching record
> from
> > > > inner
> > > > > > > table
> > > > > > > > when received a new record from outer table, output (a,
> null);
> > on
> > > > the
> > > > > > > other
> > > > > > > > direction no output.
> > > > > > > > KTable.outerjoin(KTable): when there is no matching record
> from
> > > > > inner /
> > > > > > > > outer table when received a new record from outer / inner
> > table,
> > > > > output
> > > > > > > (a,
> > > > > > > > null) or (null, b).
> > > > > > > >
> > > > > > > >
> > > > > > > > 2) The result topic is also a changelog topic, although it
> will
> > > be
> > > > > log
> > > > > > > > compacted on the key over time, if you consume immediately
> the
> > > log
> > > > > may
> > > > > > > not
> > > > > > > > be yet compacted.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > > On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral <
> > > > > > > > guillermo.lammers.cor...@tecsisa.com> wrote:
> > > > > > > >
> > > > > > > > > Hi Guozhang,
> > > > > > > > >
> > > > > > > > > Thank you very much for your reply and sorry for the
> generic
> > > > > > question,
> > > > > > > > I'll
> > > > > > > > > try to explain with some pseudocode.
> > > > > > > > >
> > > > > > > > > I have two KTable with a join:
> > > > > > > > >
> > > > > > > > > ktable1: KTable[String, String] = builder.table("topic1")
> > > > > > > > > ktable2: KTable[String, String] = builder.table("topic2")
> > > > > > > > >
> > > > > > > > > result: KTable[String, ResultUnion] =
> > > > > > > > > ktable1.join(ktable2, (data1, data2) => new
> > ResultUnion(data1,
> > > > > > data2))
> > > > > > > > >
> > > > > > > > > I send the result to a topic result.to("resultTopic").
> > > > > > > > >
> > > > > > > > > My questions are related with the following scenario:
> > > > > > > > >
> > > > > > > > > - The streming is up & running without data in topics
> > > > > > > > >
> > > > > > > > > - I send data to "topic2", for example a key/value like
> that
> > > > > > > > ("uniqueKey1",
> > > > > > > > > "hello")
> > > > > > > > >
> > > > > > > > > - I see null values in topic "resultTopic", i.e.
> > ("uniqueKey1",
> > > > > null)
> > > > > > > > >
> > > > > > > > > - If I send data to "topic1", for example a key/value like
> > that
> > > > > > > > > ("uniqueKey1", "world") then I see this values in topic
> > > > > > "resultTopic",
> > > > > > > > > ("uniqueKey1", ResultUnion("hello", "world"))
> > > > > > > > >
> > > > > > > > > Q: If we send data for one of the KTable that does not have
> > the
> > > > > > > > > corresponding data by key in the other one, obtain null
> > values
> > > in
> > > > > the
> > > > > > > > > result final topic is the expected behavior?
> > > > > > > > >
> > > > > > > > > My next step would be use Kafka Connect to persist result
> > data
> > > in
> > > > > C*
> > > > > > (I
> > > > > > > > > have not read yet the Connector docs...), is this the way
> to
> > do
> > > > it?
> > > > > > (I
> > > > > > > > mean
> > > > > > > > > prepare the data in the topic).
> > > > > > > > >
> > > > > > > > > Q: On the other hand, just to try, I have a KTable that
> read
> > > > > messages
> > > > > > > in
> > > > > > > > > "resultTopic" and prints them. If the stream is a KTable I
> am
> > > > > > wondering
> > > > > > > > why
> > > > > > > > > is getting all the values from the topic even those with
> the
> > > same
> > > > > > key?
> > > > > > > > >
> > > > > > > > > Thanks in advance! Great job answering community!
> > > > > > > > >
> > > > > > > > > 2016-04-14 20:00 GMT+02:00 Guozhang Wang <
> wangg...@gmail.com
> > >:
> > > > > > > > >
> > > > > > > > > > Hi Guillermo,
> > > > > > > > > >
> > > > > > > > > > 1) Yes in your case, the streams are really a "changelog"
> > > > stream,
> > > > > > > hence
> > > > > > > > > you
> > > > > > > > > > should create the stream as KTable, and do KTable-KTable
> > > join.
> > > > > > > > > >
> > > > > > > > > > 2) Could elaborate about "achieving this"? What behavior
> do
> > > > > require
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > application logic?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers
> Corral <
> > > > > > > > > > guillermo.lammers.cor...@tecsisa.com> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > I am a newbie to Kafka Streams and I am using it trying
> > to
> > > > > solve
> > > > > > a
> > > > > > > > > > > particular use case. Let me explain.
> > > > > > > > > > >
> > > > > > > > > > > I have two sources of data both like that:
> > > > > > > > > > >
> > > > > > > > > > > Key (string)
> > > > > > > > > > > DateTime (hourly granularity)
> > > > > > > > > > > Value
> > > > > > > > > > >
> > > > > > > > > > > I need to join the two sources by key and date (hour of
> > > day)
> > > > to
> > > > > > > > obtain:
> > > > > > > > > > >
> > > > > > > > > > > Key (string)
> > > > > > > > > > > DateTime (hourly granularity)
> > > > > > > > > > > ValueSource1
> > > > > > > > > > > ValueSource2
> > > > > > > > > > >
> > > > > > > > > > > I think that first I'd need to push the messages in
> Kafka
> > > > > topics
> > > > > > > with
> > > > > > > > > the
> > > > > > > > > > > date as part of the key because I'll group by key
> taking
> > > into
> > > > > > > account
> > > > > > > > > the
> > > > > > > > > > > date. So maybe the key must be a new string like
> > > > key_timestamp.
> > > > > > > But,
> > > > > > > > of
> > > > > > > > > > > course, it is not the main problem, is just an
> additional
> > > > > > > > explanation.
> > > > > > > > > > >
> > > > > > > > > > > Ok, so data are in topics, here we go!
> > > > > > > > > > >
> > > > > > > > > > > - Multiple records allows per key but only the latest
> > value
> > > > > for a
> > > > > > > > > record
> > > > > > > > > > > key will be considered. I should use two KTable with
> some
> > > > join
> > > > > > > > > strategy,
> > > > > > > > > > > right?
> > > > > > > > > > >
> > > > > > > > > > > - Data of both sources could arrive at any time. What
> > can I
> > > > do
> > > > > to
> > > > > > > > > achieve
> > > > > > > > > > > this?
> > > > > > > > > > >
> > > > > > > > > > > Thanks in advance.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to