I'm hoping the DSL will do what I want :) Currently the example is
continuously adding instead of bucketing, so if I modify it by adding a
window to the count function:

.groupBy((key, word) -> word)
.count(TimeWindows.of(5000L), "Counts")
.toStream((k, v) -> k.key());

Then I do see bucketing happening. However, it isn't accurate. For example,
I type into the console "kafka" as 20 sentences, but the output I get is:

kafka 4
kafka 9
kafka 2
kafka 7

Which equals 22. What am I doing wrong? What is the relationship between
commit interval and time window. The smaller I make commit interval, the
less accurate it becomes.


On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Do you know about Kafka Streams? It's DSL gives you exactly what you
> want to do.
>
> Check out the documentation and WordCount example:
>
> http://docs.confluent.io/current/streams/index.html
>
> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
>
>
> Let us know if you have further questions.
>
>
> -Matthias
>
> On 1/4/17 12:48 PM, Benjamin Black wrote:
> > Hello,
> >
> > I'm looking for guidance on how to approach a counting problem. We want
> to
> > consume a stream of data that consists of IDs and generate an output of
> the
> > aggregated count with a window size of X seconds using processing time
> and
> > a hopping time window. For example, using a window size of 1 second, if
> we
> > get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1, 2=3.
> If
> > we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
> The
> > aggregated count will then be turned into increment commands to a cache
> and
> > a database.
> >
> > Obviously we will need some state to be stored during the count of a
> > window, but we only need to keep it for the time period of the window
> (i.e.
> > a second). I was thinking this could be achieved by using a persistent
> > store, where the counts are reset during the punctuate and the store
> topic
> > uses log compression. Alternatively, we could simple have an in memory
> > store that is reset during the punctuate. My concern with the in memory
> > store is that I don't know when the input topic offset is committed or
> when
> > the output data is written and therefore we could lose data. Ultimately,
> at
> > the end of the second, the input offset and output data should be written
> > at the same time, reducing the likelihood of lost data. We would rather
> > lose data, than have duplicate counts. What is the correct approach? Is
> > there a better way of tackling the problem?
> >
> > I have put together some code, but it doesn't do exactly what I expect.
> I'm
> > happy to share if it helps.
> >
> > Thanks,
> > Ben
> >
>
>

Reply via email to