kstream transform forward to different topics

2019-02-06 Thread Nan Xu
when I do the transform, for a single input record, I need to output 3
different records, those 3 records are in different classes.  I want to
send the each type of records to a separate topic, my understanding is I
should use

context.forward inside the transformer  like

Transformer{..
context.forward(key, record1, To.child("topic1"))
context.forward(key, value1, To.child("topic2"))
}
but how do I define those processor, I can create them in topology but who
should be their parent? what's the name of the parent?

stream.transform(transformer) don't give me a way to say processor name.

Thanks,
Nan


Re: measuring incoming bytes

2019-02-06 Thread Christopher Bogan
Bytespersec online

On Wed, Feb 6, 2019, 12:22 PM Amitav Mohanty  Which part?
>
> -Amitav
>
> On Mon, Feb 4, 2019 at 8:42 PM Christopher Bogan <
> ambitiousking...@gmail.com>
> wrote:
>
> > Correct
> >
> > On Sat, Feb 2, 2019, 10:09 AM Amitav Mohanty  > wrote:
> >
> > > Hi
> > >
> > > I am trying to measure incoming bytes over time. I am trying collect
> the
> > > following metric and apply integral function over a set of data points
> > on a
> > > time series.
> > >
> > > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=my_topic
> > > FifteenMinuteRate
> > >
> > > It seems that the number I am getting is much less than expected. So, I
> > am
> > > suspecting that my understanding of this metric is wrong.
> > >
> > > I initially thought this metric is the total volume of bytes that came
> in
> > > for that topic in last 15 mins. So, if I collect this metric every 15
> > mins,
> > > then I will have total volume over time by calculating the sum of data
> > > points in the time interval.
> > >
> > > Please confirm if my understanding is correct.
> > >
> > > Regards,
> > > Amitav
> > >
> >
>


Re: Minimizing global store restoration time

2019-02-06 Thread Taylor P
Hi Patrik,

I am not sure that https://issues.apache.org/jira/browse/KAFKA-7380 will
resolve this issue since our application is dependent on the global store
being fully restored before the application can be considered healthy. It
does not seem like KAFKA-7380 is aiming to address the nature of global
stores restoring each partition sequentially - it is aiming to change the
blocking nature of #start(). Restoring the global store partitions in
parallel would definitely speed things up, though, and admittedly my first
thought when debugging this was "why isn't this restoring each partition in
parallel?".

Changing our streams topology to avoid using a global store for such a
large amount of data would be doable but it does seem like a significant
amount of work. I am curious to know if anyone else is storing large
amounts of data in global stores and whether there are any inherent
limitations to the size of global stores.

Our topic is already using compaction.

Taylor

On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl  wrote:

> Hi Taylor
>
> We are facing the same issue, although on a smaller scale.
> The main problem as you found is that the restoration is running
> sequentially, this should be addressed in
> https://issues.apache.org/jira/browse/KAFKA-7380, although there has been
> no progress lately.
>
> On the other hand you could try re-evaluate if your problem can only be
> solved with global state stores, in our case (both in streams as well as
> for interactive queries) we could solve it with local state stores too,
> although only with more changes and more complexity in the topology.
>
> Not sure if it is applicable for your case, but have you looked into
> compression for the topics?
>
> best regards
>
> Patrik
>
> On Tue, 5 Feb 2019 at 22:37, Taylor P  wrote:
>
> > Hi,
> >
> > I am having issues with the global store taking a very long time to
> restore
> > during startup of a Kafka Streams 2.0.1 application. The global store is
> > backed by a RocksDB persistent store and is added to the Streams topology
> > in the following manner: https://pastebin.com/raw/VJutDyYe The global
> > store
> > topic has approximately 15 million records per partition and 18
> partitions.
> > The following global consumer settings are specified:
> >
> > poll.timeout.ms = 10
> > max.poll.records = 2000
> > max.partition.fetch.bytes = 1048576
> > fetch.max.bytes = 52428800
> > receive.buffer.bytes = 65536
> >
> > I have tried tweaking the settings above on the consumer side, such as
> > increasing poll.timeout.ms to 2000, max.poll.records to 1, and
> > max.partition.fetch.bytes to 52428800, but it seems that I keep hitting a
> > ceiling of restoring approximately 100,000 records per second. With 15
> > million records per partition, it takes approximately 150 seconds to
> > restore a single partition. With 18 partitions, it takes roughly 45
> minutes
> > to fully restore the global store.
> >
> > Switching from HDDs to SSDs on the brokers' log directories made
> > restoration roughly 25% faster overall, but this still feels slow. It
> seems
> > that I am hitting IOPS limits on the disks and am not even close to
> hitting
> > the throughput limits of the disks on either the broker or streams
> > application side.
> >
> > How can I minimize restoration time of a global store? Are there settings
> > that can increase throughput with the same number of IOPS? Ideally
> > restoration of each partition could be done in parallel but I recognize
> > there is only a single global store thread. Bringing up a new instance of
> > the Kafka Streams application occurs on a potentially daily basis, so the
> > restoration time is becoming more and more of a hassle.
> >
> > Thanks.
> >
> > Taylor
> >
>


Re: Kafka exactly-once with multiple producers

2019-02-06 Thread Tim Jiang
Thanks Matthias, this is exactly the answer I was looking for!

On Tue, Feb 5, 2019 at 11:26 PM Matthias J. Sax 
wrote:

> Each producer will need to use it's own `transactional.id`. Otherwise,
> one producer would fence-off and "block" the other.
>
> Both producers can start transactions independently from each other, and
> also commit independently (or abort, or a mix of commit/abort between
> both). Messages of both producers will be written interleaved into the
> topic (similar to how producers write without EOS).
>
> > does Kafka guarantee only 1 producer request
> >> will be processed at a time?
>
> No. There is no reason for this. It's possible that both producer have
> an open transaction and write their data interleaved into the topic.
>
>
> -Matthias
>
> On 2/5/19 5:55 PM, Tim Jiang wrote:
> > Hi,
> > I'm read this about how Kafka transaction works:
> > https://www.confluent.io/blog/transactions-apache-kafka/
> > Please help me understand what happens when there are multiple producers
> > are writing to the same topic concurrently? Should all of them use the
> same
> > transaction id, and if so, does Kafka guarantee only 1 producer request
> > will be processed at a time?
> >
> > Thanks,
> > Tim
> >
>
>


Re: measuring incoming bytes

2019-02-06 Thread Amitav Mohanty
Which part?

-Amitav

On Mon, Feb 4, 2019 at 8:42 PM Christopher Bogan 
wrote:

> Correct
>
> On Sat, Feb 2, 2019, 10:09 AM Amitav Mohanty  wrote:
>
> > Hi
> >
> > I am trying to measure incoming bytes over time. I am trying collect the
> > following metric and apply integral function over a set of data points
> on a
> > time series.
> >
> > kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=my_topic
> > FifteenMinuteRate
> >
> > It seems that the number I am getting is much less than expected. So, I
> am
> > suspecting that my understanding of this metric is wrong.
> >
> > I initially thought this metric is the total volume of bytes that came in
> > for that topic in last 15 mins. So, if I collect this metric every 15
> mins,
> > then I will have total volume over time by calculating the sum of data
> > points in the time interval.
> >
> > Please confirm if my understanding is correct.
> >
> > Regards,
> > Amitav
> >
>


Kafka streams exactly_once auto commit timeout transaction issue

2019-02-06 Thread Xander Uiterlinden
Hi,

I'm trying to get a fairly simple example of using Kafka Streams with
exactly once processing to work. I defined a setup where messages are being
read from an input topic and two streams transform and output the result to
their own output topic.
In normal conditions this works fine, i.e. when publishing a message to the
input topic, I get transformed messages in both of the output topics.
Next I enabled exactly once processing by setting "processing.guarantee" to
"exactly_once". To test this I'm deliberately throwing an exception when
transforming the message in one of the stream processors. At a first glance
the result is as expected, as neither of the output topics contain the
transformed message and the application stops processing.
However, when processing a message takes longer than the commit.interval.ms
(which defaults to 100 when using exactly_once), then the transactional
guarantee does not appear to be there and I get an output message in only
one of the output topics (the one I for which I did not deliberately throw
an exception while processing). I tested this by putting a Thread.sleep()
before throwing the Exception.
Can someone explain the relationship between this commit.interval.ms and
exactly_once processing. I'd think it's rather strange that when processing
takes longer than the commit.interval.ms you lose the atomicity of the
transaction.
I'm using Kafka 2.12-1.1.0 by the way.

Kind regards,

Xander