Re: kafka streams partition assignor strategy for version 2.5.1 - does it use sticky assignment

2023-04-15 Thread John Roesler
Hi Pushkar,

In 2.5, Kafka Streams used an assignor that tried to strike a compromise 
between stickiness and workload balance, so you would observe some stickiness, 
but not all the time.

In 2.6, we introduced the "high availability task assignor" (see KIP-441 
https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams).
 This assignor is guaranteed to always assign tasks to the instance that is 
most caught up (typically, this would be the instance that was already the 
active processor, which is equivalent to stickiness). In the case of losing an 
instance (eg the pod gets replaced), any standby replica would be considered 
"most caught up" and would take over processing with very little downtime.

The new assignor achieves balance over time by "warming up" tasks in the 
background on other instances and then swaps the assignment over to them when 
they are caught up.

So, if you upgrade Streams, you should be able to configure at least one 
standby task and then be able to implement the "rolling replacement" strategy 
you described. If you are willing to wait until Streams gradually balances the 
assignment over time after each replacement, then you can cycle out the whole 
cluster without ever having downtime or developing workload skew. Note that 
there are several configuration parameters you can adjust to speed up the 
warm-up process: 
https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-Parameters.

I hope this helps!
-John

On 2023/04/14 17:41:19 Pushkar Deole wrote:
> Any inputs on below query?
> 
> On Wed, Apr 12, 2023 at 2:22 PM Pushkar Deole  wrote:
> 
> > Hi All,
> >
> > We are using version 2.5.1 of kafka-streams with 3 application instances
> > deployed as 3 kubernetes pods.
> > It consumes from multiple topics, each with 6 partitions.
> > I would like to know if streams uses sticky partition assignor strategy
> > internally since we can't set it externally on streams.
> >
> > My scenario is like this: during rolling upgrades
> > Step 1: 1 new pod comes up so there are 4 pods, with some partitions
> > assigned to newly created pod and k8s then deletes one of older pods, so it
> > is pod1, pod2, pod3 (older) and pod4 (newer). Then pod1 is deleted. So
> > ultimately pod2, pod3, pod4
> >
> > Step 2: K8s then repeats same for another old pod i.e. create a new pod
> > and then delete old pod. So pod2, pod3, pod4, pod5 and then delete pod2. So
> > ultimately pod3, pod4 and pod5
> >
> > The question I have here is: will kafka streams try to sticky with the
> > partitions assigned to newly created pods during all these rebalances i.e.
> > the partitions assigned to pod4 in step 1 will still be retained during
> > step 2 when another older pod gets deleted OR the partitions are reshuffled
> > on each rebalance whenever older pods get deleted. So during step 2, when
> > pod2 is deleted, the partitions assigned to pod4 in step 1 will also
> > reshuffle again or it will be there and any new partitions will only be
> > assigned?
> >
> >
> 


Re: same keys appearing in state stores on different pods when using branches in kafka streams

2022-12-05 Thread John Roesler
Hi Pushkar,

I'm sorry for the delay. I'm afraid I'm having trouble picturing the situation. 
Can you provide the topology description? That will show us whether we should 
expect the stores to always be in the same instances or not. If you can also 
include a simplified version of your program, we might be able to provide some 
suggestions.

Thanks,
-John

On Mon, Dec 5, 2022, at 10:52, Pushkar Deole wrote:
> John or Matthias
>
> can you help here, we are frequently getting errors like below:
>
> org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> store, records, may have migrated to another instance.
>
> For the same key, the record exist in totals state store but not in
> 'records' state store.
>
> John,
>
> can you provide more details on the groupBy option?
>
> On Tue, Nov 29, 2022 at 12:24 PM Pushkar Deole  wrote:
>
>> Hi John,
>>
>> I am not sure I understood it correctly, even with branching that uses a
>> different state store, the key of incoming event is still the same, so we
>> expect it to land in the local state store on the same pod.
>> e.g. an event with OPEN status, with key xyz came in and processed through
>> one branch and it is stored in state store 'totals', state maintained on
>> local state store on same pod
>> 2nd event with OPEN status, with key xyz came in and again processed and
>> stored in 'totals'. State maintained on local state store on same pod
>>
>> 3rd event with CLOSED status, with key xyz came in and processed. The
>> state is stored in 'record' state store, it is expected to be stored in
>> state store on same pod.
>> Why it would go to some other pod?
>>
>> On Wed, Nov 23, 2022 at 8:50 PM John Roesler  wrote:
>>
>>> Hi Pushkar,
>>>
>>> Thanks for the question. I think that what’s happening is that, even
>>> though both branches use the same grouping logic, Streams can’t detect that
>>> they are the same. It just sees two group-bys and therefore introduces two
>>> repartitions, with a separate downstream task for each.
>>>
>>> You might want to print out the topology description and visualize it
>>> with https://zz85.github.io/kafka-streams-viz/ . That will show you
>>> whether the stores wind up in the same task or not.
>>>
>>> The visualization will also show you the names of the input topics for
>>> those two partitions, which you can use in conjunction with the metadata
>>> methods on your KafkaStreams instance to query for the location of the keys
>>> in both stores.
>>>
>>> I suspect that with some tweaks you can re-write the topology to just
>>> have one downstream task, if that’s what you prefer.
>>>
>>> By the way, I think you could propose to add an optimization to make the
>>> groupBy behave the way you expected. If that’s interesting to you, let us
>>> know and we can give you some pointers!
>>>
>>> I hope this helps,
>>> John
>>>
>>> On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
>>> > Hi All,
>>> >
>>> > I have a stream application that creates 2 branches.  Each branch
>>> includes
>>> > a state store where the status field of the kafka message determines the
>>> > branch, and therefore the state store used:
>>> >
>>> > Status OPEN = State store name totals
>>> >
>>> > Status CLOSED = State store name records
>>> >
>>> >
>>> >
>>> > I’m seeing that the streams application is running on a pod; however I’m
>>> > getting the exception:
>>> >
>>> > org.apache.kafka.streams.errors.InvalidStateStoreException: The state
>>> > store, records, may have migrated to another instance.
>>> >
>>> >
>>> >
>>> > If I physically access the pod and check the Rocksdb folders I do not
>>> see
>>> > the state store folder.  If I check the keys in the totals state store
>>> on
>>> > this pod, I can find the key in the records state store on another pod.
>>> I
>>> > had assumed that because the key of the events are the same, the same
>>> > partition would be used for the two branches and therefore the same
>>> keys in
>>> > these two state store would be created on the same Kubernetes pod.
>>> This is
>>> > not an issue for the Kafka stream, but that assumption was used in the
>>> way
>>> > the state stores are read.  I assumed if I found the key in the 

Re: Kafka service unavailable

2022-12-05 Thread John Roesler
Hi Rakesh,

I'm sorry for your trouble. The mailing list doesn't transmit embedded images, 
so we can't see the information you provided. Maybe you can create a Github 
Gist or open a Jira ticket at 
https://issues.apache.org/jira/projects/KAFKA/issues ?

Thanks,
-John

On Sun, Dec 4, 2022, at 23:59, Rakesh Kumar Tunuguntla wrote:
> Hi Team,
> 
> We have a Kafka service installed and we are unable to restore the 
> service or restart it. 
> 
> When we check the status, it is showing as failed.
> 
> Can you please look into the query and provide the resolution faster.
> 
> Warm Regards,
> Rakesh Kumar


Re: Stream sinks are not constructed when application starts up before Kafka broker

2022-11-23 Thread John Roesler
Hi Alexander,

I’m sorry to hear that. It certainly sounds like a hard one to debug. 

To clarify, do you mean that when you observe this problem, the sink node is 
not in the topology at all, or that it is in the topology, but does not 
function properly?

Also, are you using Spring to construct the topology, or are you calling the 
Kafka Streams library directly to build the topology?

If the problem is that the sink node is missing completely, it’s hard to 
imagine how the problem could be Streams. When you are building the topology in 
Streams, there is no connection to Kafka at all.

Then again, I’ve seen enough heisenbugs to know not to trust intuition too 
much. If you can try just using the Streams builder directly to create the 
topology, maybe you can see if you can still reproduce the issue?

I hope this helps!
-John

On Tue, Nov 22, 2022, at 14:07, Alexander Kau wrote:
> My team is building a set of services that use Kafka Connect and Debezium
> to forward data changes from our Postgres database to Kafka, and then use
> Kafka Streams (via Spring Cloud Stream) to process this data and output an
> aggregate of the source data.
>
> We have been trying to track down an issue where the stream processors are
> not correctly configured when the application starts up before Kafka is up.
> Specifically, all of the processor nodes are correctly created except for
> the KTABLE-SINK-0# node. The result of this is that the services
> consume messages but do not forward their changes to their output topics.
> Therefore, data processing stops while the consumer offset continues to be
> incremented, so we lose messages and have to roll back the offsets and
> reprocess a large amount of data.
>
> This happens in both our Kubernetes environment and our Chef-managed
> environment. In the Chef environment, simply restarting the server is
> enough to trigger this issue, since Kafka takes longer to start up than the
> application. In Kubernetes, I can reliably trigger the issue by removing
> the application's dependency on Kafka, stopping Kafka, restarting the
> application, and then starting Kafka.
>
> I have tested with Kafka 3.0.0 and 3.3.1, and the behavior does not change.
> We are using the latest Spring dependencies (Spring cloud 2021.0.5, Spring
> Cloud Stream 3.2.6).
>
> This may be a "heisenbug": a bug that only occurs when it is not being
> observed. I spent most of yesterday debugging the Kafka setup code during
> application startup with and without the Kafka broker running, and was
> unable to reproduce the bug while doing so, but was able to consistently
> reproduce it as soon as I stopped debugging the startup process. I suspect
> that this may mean that this is a race condition in the parts of the
> application that only run after connecting to the Kafka broker, but I'm not
> familiar enough with the Kafka source to go much further on this.
>
> Although I understand that this is a bit of an edge case (the Kafka broker
> should generally not go down), the results here involve missing/invalid
> data, and it is not possible to confirm whether the application is in this
> case except by either confirming that "this consumer consumed a message but
> didn't forward it to its output topic" or by hooking up a debugger and
> inspecting the ProcessorContext, so we can't reasonably implement a health
> check to verify whether the application is in the bad state and restart it.
> (Even if we could check for this state, there's no guarantee that it didn't
> lose some messages while it was in the bad state.)
>
> I've done a fair amount of searching for this sort of issue, but have been
> unable to find any other people who I can confirm to have the same issue. I
> am not certain whether this is an issue in Kafka itself or in Spring Cloud
> Stream.
>
> Any guidance or suggestions would be appreciated.


Re: same keys appearing in state stores on different pods when using branches in kafka streams

2022-11-23 Thread John Roesler
Hi Pushkar,

Thanks for the question. I think that what’s happening is that, even though 
both branches use the same grouping logic, Streams can’t detect that they are 
the same. It just sees two group-bys and therefore introduces two repartitions, 
with a separate downstream task for each.

You might want to print out the topology description and visualize it with 
https://zz85.github.io/kafka-streams-viz/ . That will show you whether the 
stores wind up in the same task or not.

The visualization will also show you the names of the input topics for those 
two partitions, which you can use in conjunction with the metadata methods on 
your KafkaStreams instance to query for the location of the keys in both stores.

I suspect that with some tweaks you can re-write the topology to just have one 
downstream task, if that’s what you prefer.

By the way, I think you could propose to add an optimization to make the 
groupBy behave the way you expected. If that’s interesting to you, let us know 
and we can give you some pointers!

I hope this helps,
John

On Wed, Nov 23, 2022, at 05:36, Pushkar Deole wrote:
> Hi All,
>
> I have a stream application that creates 2 branches.  Each branch includes
> a state store where the status field of the kafka message determines the
> branch, and therefore the state store used:
>
> Status OPEN = State store name totals
>
> Status CLOSED = State store name records
>
>
>
> I’m seeing that the streams application is running on a pod; however I’m
> getting the exception:
>
> org.apache.kafka.streams.errors.InvalidStateStoreException: The state
> store, records, may have migrated to another instance.
>
>
>
> If I physically access the pod and check the Rocksdb folders I do not see
> the state store folder.  If I check the keys in the totals state store on
> this pod, I can find the key in the records state store on another pod. I
> had assumed that because the key of the events are the same, the same
> partition would be used for the two branches and therefore the same keys in
> these two state store would be created on the same Kubernetes pod.  This is
> not an issue for the Kafka stream, but that assumption was used in the way
> the state stores are read.  I assumed if I found the key in the 'totals'
> state store, the same key would be found on the same pod in the 'records'
> state store.
>
>
>
> The questions I have are:
>
> 1) Is it expected that the state stores can hold the partition data on
> different pods, and is this unique to streams using branch?
>
> 2) Is there a way to know if the state store is on the pod to avoid
> handling this as an exception?
>
>
>
> Here is the topology of the stream in question:
>
> KStream[] branches = stream
>
> .peek(receivingEventLogger)
>
> .selectKey(keyMapper)
>
> .mapValues(totalsValueMapper)
>
> .filter(nullKeyValueEventFilter)
>
> .branch((k, v) -> (RecordStatus.CLOSED.name
> ().equalsIgnoreCase(v.getCurrent().getRecordStatus())
>
> || RecordStatus.LB_RDELETE.name
> ().equalsIgnoreCase(v.getCurrent().getRecordStatus())),
>
> (k, v) -> true);
>
>
>
> // CLOSED and LB_RDELETE branch writes to records state store
>
> branches[0]
>
> .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
>
> .aggregate(totalsInitializer, totalsAggregator,
> materializedRecords)
>
> .toStream()
>
> .map(totalsInternalKeyValueMapper)
>
> .filter(nullKeyStringValueEventFilter)
>
> .to(loopbackTopic.name());
>
>
>
> // DEFAULT branch writes to totals state store
>
> branches[1]
>
> .groupByKey(Grouped.with(Serdes.String(), totalsValueSerde))
>
> .aggregate(totalsInitializer, totalsAggregator,
> materializedTotals)
>
> .toStream()
>
> .flatMap(totalsKeyValueMapper)
>
> .filter(nullKeyStringValueEventFilter)
>
> .peek(sendingEventLogger)
>
> .to(toTopic.name());


Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread John Roesler
Congratulations, Bruno!!!

On Tue, Nov 1, 2022, at 15:16, Lucas Brutschy wrote:
> Wow, congratulations!
>
> On Tue, Nov 1, 2022 at 8:55 PM Chris Egerton  wrote:
>>
>> Congrats!
>>
>> On Tue, Nov 1, 2022, 15:44 Bill Bejeck  wrote:
>>
>> > Congrats Bruno! Well deserved.
>> >
>> > -Bill
>> >
>> > On Tue, Nov 1, 2022 at 3:36 PM Guozhang Wang  wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > I'd like to introduce our new Kafka PMC member, Bruno.
>> > >
>> > > Bruno has been a committer since April. 2021 and has been very active in
>> > > the community. He's a key contributor to Kafka Streams, and also helped
>> > > review a lot of horizontal improvements such as Mockito. It is my
>> > pleasure
>> > > to announce that Bruno has agreed to join the Kafka PMC.
>> > >
>> > > Congratulations, Bruno!
>> > >
>> > > -- Guozhang Wang, on behalf of Apache Kafka PMC
>> > >
>> >


Re: Reprocessing messsages in kafka streams vs Transformer init

2022-10-07 Thread John Roesler
Hello Tomasz,

Thanks for the question!

Streams should always call init() before passing any records to transform(...).

When we talk about "reprocessing", we just mean that some record was processed, 
but then there was a failure before its offset was committed, and therefore we 
have to process it again after recovery. There's no difference in code paths 
between the first and second time the record is processed. In all cases, 
Streams will initialize the Transformer (and other Processors) before using 
them.

I think the last part of the answer is that, if we wind up observing the same 
record twice in transform(...), it must be because there was a failure and then 
a recovery, in which case, we will close and re-create the processor (and call 
init() ).

One thing to be aware of, though, is that (under at-least-once), Streams does 
not guarantee to clear out any dirty state from a state store. Therefore, even 
though we re-initialize the processors (including Transformers), if you're 
using a persistent state store, we'll just re-open it. One of the key things 
that exactly-once mode does differently is to clear out all dirty writes from 
the state store during failure recovery.

I hope this helps!

Thanks,
John

On Fri, Oct 7, 2022, at 15:57, xardaso wrote:
> Hi Everyone,
>
> I have a question related to messages auto reprocessing in kafka 
> streams and Transformer/Processor init()/close() methods.
>
> Is possible that in some scenarios (failures, rebalance etc.) a message 
> is processed twice by Transformer.transform() without calling 
> Transformer.init() between the first and the second processing?
>
> To illustrate my question with an example. Let's say I have kafka 
> streams application with a default at_least_once semantics setting. The 
> topology is the following - read messages from one topic, apply 
> Transformer and produce messages to another topic. Transformer:
>
> new TransformerSupplier() {
>  Transformer get() {
>  return new Transformer() {
>  private ProcessorContext context;
>
>  void init(ProcessorContext context) {
>  logger.info("init called());
>
> this.context = context;
>  }
>
>  KeyValue transform(K key, V value) {
>  logger.info("Transform offset: {}, partition: {}, 
> context.offset(), context.partition());
>  return new KeyValue(key, value);
>  }
>
>  void close() {
>  logger.info("close() called.")
>  }
>  }
>  }
>  }
>
> Is it possible that the app will log "Transform offset: x, partition: 
> y" twice with the same x and y values each time without logging "init() 
> called." between (chronologically)?
>
> I tried to simulate some failure scenarios (for example when 
> CommitFailedException is thrown) but in my simple test cases init() was 
> always called between the first and the second reprocessing. Although I 
> could have easily missed something or a scenario where this is not true 
> which is why I'm asking this question here.
>
> Sorry, if my question is not clear - please let me know in this case 
> and I'll try to clarify.
>
> Thank you for any help you can provide.
>
> Regards,
> Tomasz


Re: [VOTE] 3.3.1 RC0

2022-09-30 Thread John Roesler
Hi José,

I verified the signatures and ran all the unit tests, as well as the Streams 
integration tests with:

> ./gradlew -version
> 
> 
> Gradle 7.4.2
> 
> 
> Build time:   2022-03-31 15:25:29 UTC
> Revision: 540473b8118064efcc264694cbcaa4b677f61041
> 
> Kotlin:   1.5.31
> Groovy:   3.0.9
> Ant:  Apache Ant(TM) version 1.10.11 compiled on July 10 2021
> JVM:  1.8.0_342 (Private Build 25.342-b07)
> OS:   Linux 5.15.0-48-generic amd64

I'm +1 (binding), pending system test results.

Thanks,
-John

On Fri, Sep 30, 2022, at 11:46, Bill Bejeck wrote:
> Hi,
>
> I did the following to validate the release:
>
>1. Validated all checksums, signatures
>2. Built from source and ran all the unit tests
>3. Ran the ZK and KRaft quickstart
>4. Ran the Raft single quorum test
>5. Ran the Kafka Streams quick start
>
> +1(binding) pending successful system test run
>
> Thanks,
> Bill
>
> On Fri, Sep 30, 2022 at 5:30 AM David Jacot 
> wrote:
>
>> Hey,
>>
>> I performed the following validations:
>> * Verified all checksums and signatures.
>> * Built from source and ran unit tests.
>> * Ran the first quickstart steps for both ZK and KRaft.
>> * Spotchecked the Javadocs.
>>
>> I am +1 (binding), assuming that the system tests look good.
>>
>> Thanks for running the release.
>>
>> Best,
>> David
>>
>> On Fri, Sep 30, 2022 at 2:23 AM José Armando García Sancio
>>  wrote:
>> >
>> > On Thu, Sep 29, 2022 at 2:39 PM José Armando García Sancio
>> >  wrote:
>> > > Please download, test and vote by Tuesday, October 4, 9am PT.
>> >
>> > The vote will be open for 72 hours. Please vote by Sunday, October 2nd,
>> 3 PM PT.
>> >
>> > Thanks!
>> > --
>> > -José
>>


Re: Out of order messages when kafka streams application catches up

2022-09-30 Thread John Roesler
Hi again, Tomasz,

Your issue is really bugging me, since I'm pretty sure it shouldn't be 
happening.

I went ahead and added an integration test with your exact scenario, as I 
understand it: https://github.com/apache/kafka/pull/12706

The test passes for me.

Do you think you can check it out and try adjusting the test setup until you're 
able to reproduce the behavior you're seeing? If you can do that, I think we 
will get to the bottom of it.

Thanks,
-John

On Fri, Sep 30, 2022, at 09:51, John Roesler wrote:
> Hi Tomasz,
>
> Thanks for trying that out. It’s not the way I’d expect it to work. I 
> don’t remember if there were any follow-up bugs that have been solved 
> in subsequent releases. Just as a long shot, perhaps you can try it on 
> the latest release (3.3.0)?
>
> Otherwise, I think the best path forward would be to file a bug report 
> on the Apache Kafka Jira with enough information to reproduce the issue 
> (or if you’re able to provide a repro, that would be awesome).
>
> Thanks, and sorry for the trouble. 
> -John
>
> On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote:
>> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and it did
>> not help.
>> When lag is large, the application still consumes data batches without
>> interleaving.
>>
>>
>>
>> wt., 27 wrz 2022 o 05:51 John Roesler  napisał(a):
>>
>>> Hi Tomasz,
>>>
>>> Thanks for asking. This sounds like the situation that we fixed in Apache
>>> Kafka 3.0, with KIP-695 (
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
>>> ).
>>>
>>> Can you try upgrading and let us know if this fixes the problem?
>>>
>>> Thanks,
>>> -John
>>>
>>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
>>> > Hi group,
>>> >
>>> > I wrote a simple kafka streams application with topology such as below:
>>> >
>>> > builder.addStateStore(
>>> >> Stores.keyValueStoreBuilder(
>>> >> Stores.persistentKeyValueStore("STORE"),
>>> >> Serdes.String(), Serdes.String())
>>> >> .withLoggingEnabled(storeConfig))|
>>> >
>>> >
>>> >
>>> > builder.stream("TOPIC_1", Consumed.with(...))
>>> >> .merge(builder.stream("TOPIC_2", Consumed.with(...))
>>> >> .merge(builder.stream("TOPIC_3", Consumed.with(...))
>>> >> .map(...) // stateless
>>> >> .transform(..., "STORE")  // stateful
>>> >
>>> > .to("TOPIC_4");
>>> >
>>> >
>>> > All input topics have 6 partitions, and for the purpose of testing, we
>>> are
>>> > producing data to partition number 5.
>>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
>>> >
>>> > The application works as expected when it has caught up to the lag, eg.
>>> > when reset tool is used with --to-latest parameter.
>>> > However, when the application is processing the messages starting from
>>> the
>>> > earliest offset, the inputs are provided in batches such as:
>>> >
>>> >- ~1000 messages from TOPIC_1
>>> >- ~1000 messages from TOPIC_2
>>> >- ~1000 messages from TOPIC_3
>>> >
>>> > All of the messages have timestamps provided in headers, so I would
>>> expect
>>> > the application to interleave the messages from these three topics so
>>> that
>>> > their timestamps are in the ascending order.
>>> > However, this is not the case that I am observing. The messages are
>>> > processed in batches.
>>> >
>>> > How do I configure my application so that it processes messages in order
>>> > when it is catching up to the lag?
>>>


Re: Out of order messages when kafka streams application catches up

2022-09-30 Thread John Roesler
Hi Tomasz,

Thanks for trying that out. It’s not the way I’d expect it to work. I don’t 
remember if there were any follow-up bugs that have been solved in subsequent 
releases. Just as a long shot, perhaps you can try it on the latest release 
(3.3.0)?

Otherwise, I think the best path forward would be to file a bug report on the 
Apache Kafka Jira with enough information to reproduce the issue (or if you’re 
able to provide a repro, that would be awesome).

Thanks, and sorry for the trouble. 
-John

On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote:
> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and it did
> not help.
> When lag is large, the application still consumes data batches without
> interleaving.
>
>
>
> wt., 27 wrz 2022 o 05:51 John Roesler  napisał(a):
>
>> Hi Tomasz,
>>
>> Thanks for asking. This sounds like the situation that we fixed in Apache
>> Kafka 3.0, with KIP-695 (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
>> ).
>>
>> Can you try upgrading and let us know if this fixes the problem?
>>
>> Thanks,
>> -John
>>
>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
>> > Hi group,
>> >
>> > I wrote a simple kafka streams application with topology such as below:
>> >
>> > builder.addStateStore(
>> >> Stores.keyValueStoreBuilder(
>> >> Stores.persistentKeyValueStore("STORE"),
>> >> Serdes.String(), Serdes.String())
>> >> .withLoggingEnabled(storeConfig))|
>> >
>> >
>> >
>> > builder.stream("TOPIC_1", Consumed.with(...))
>> >> .merge(builder.stream("TOPIC_2", Consumed.with(...))
>> >> .merge(builder.stream("TOPIC_3", Consumed.with(...))
>> >> .map(...) // stateless
>> >> .transform(..., "STORE")  // stateful
>> >
>> > .to("TOPIC_4");
>> >
>> >
>> > All input topics have 6 partitions, and for the purpose of testing, we
>> are
>> > producing data to partition number 5.
>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
>> >
>> > The application works as expected when it has caught up to the lag, eg.
>> > when reset tool is used with --to-latest parameter.
>> > However, when the application is processing the messages starting from
>> the
>> > earliest offset, the inputs are provided in batches such as:
>> >
>> >- ~1000 messages from TOPIC_1
>> >- ~1000 messages from TOPIC_2
>> >- ~1000 messages from TOPIC_3
>> >
>> > All of the messages have timestamps provided in headers, so I would
>> expect
>> > the application to interleave the messages from these three topics so
>> that
>> > their timestamps are in the ascending order.
>> > However, this is not the case that I am observing. The messages are
>> > processed in batches.
>> >
>> > How do I configure my application so that it processes messages in order
>> > when it is catching up to the lag?
>>


Re: Out of order messages when kafka streams application catches up

2022-09-26 Thread John Roesler
Hi Tomasz,

Thanks for asking. This sounds like the situation that we fixed in Apache Kafka 
3.0, with KIP-695 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization).

Can you try upgrading and let us know if this fixes the problem?

Thanks,
-John

On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
> Hi group,
>
> I wrote a simple kafka streams application with topology such as below:
>
> builder.addStateStore(
>> Stores.keyValueStoreBuilder(
>> Stores.persistentKeyValueStore("STORE"),
>> Serdes.String(), Serdes.String())
>> .withLoggingEnabled(storeConfig))|
>
>
>
> builder.stream("TOPIC_1", Consumed.with(...))
>> .merge(builder.stream("TOPIC_2", Consumed.with(...))
>> .merge(builder.stream("TOPIC_3", Consumed.with(...))
>> .map(...) // stateless
>> .transform(..., "STORE")  // stateful
>
> .to("TOPIC_4");
>
>
> All input topics have 6 partitions, and for the purpose of testing, we are
> producing data to partition number 5.
> We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
>
> The application works as expected when it has caught up to the lag, eg.
> when reset tool is used with --to-latest parameter.
> However, when the application is processing the messages starting from the
> earliest offset, the inputs are provided in batches such as:
>
>- ~1000 messages from TOPIC_1
>- ~1000 messages from TOPIC_2
>- ~1000 messages from TOPIC_3
>
> All of the messages have timestamps provided in headers, so I would expect
> the application to interleave the messages from these three topics so that
> their timestamps are in the ascending order.
> However, this is not the case that I am observing. The messages are
> processed in batches.
>
> How do I configure my application so that it processes messages in order
> when it is catching up to the lag?


Re: [VOTE] 3.3.0 RC2

2022-09-26 Thread John Roesler
Thanks for running this, David!

I've verified the signatures, looked at the docs, and run the quickstart (ZK 
and KRaft). I also ran the unit tests, as well as all the tests for Streams 
locally.

The docs look a little malformed (the "collapse/expand" button floats over the 
text, the collapsed doc tree is only halfway collapsed, and there's a weird 
empty panel on the right).

We can fix the docs site independent of this release, so I'm +1 (binding).

Thanks,
-John

On Tue, Sep 20, 2022, at 18:17, David Arthur wrote:
> Hello Kafka users, developers and client-developers,
>
> This is the second release candidate for Apache Kafka 3.3.0. Many new
> features and bug fixes are included in this major release of Kafka. A
> significant number of the issues in this release are related to KRaft,
> which will be considered "production ready" as part of this release
> (KIP-833)
>
> KRaft improvements:
> * KIP-778: Online KRaft to KRaft Upgrades
> * KIP-833: Mark KRaft as Production Ready
> * KIP-835: Monitor Quorum health (many new KRaft metrics)
> * KIP-836: Expose voter lag via kafka-metadata-quorum.sh
> * KIP-841: Fenced replicas should not be allowed to join the ISR in KRaft
> * KIP-859: Add Metadata Log Processing Error Related Metrics
>
> Other major improvements include:
> * KIP-618: Exactly-Once Support for Source Connectors
> * KIP-831: Add metric for log recovery progress
> * KIP-827: Expose logdirs total and usable space via Kafka API
> * KIP-834: Add ability to Pause / Resume KafkaStreams Topologies
>
> The full release notes are available here:
> https://home.apache.org/~davidarthur/kafka-3.3.0-rc2/RELEASE_NOTES.html
>
> Please download, test and vote by Monday, Sep 26 at 5pm EDT
>
> Also, huge thanks to José for running the release so far. He has done
> the vast majority of the work to prepare this rather large release :)
>
> -
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~davidarthur/kafka-3.3.0-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc: https://home.apache.org/~davidarthur/kafka-3.3.0-rc2/javadoc/
>
> * Tag to be voted upon (off 3.3 branch) is the 3.3.0 tag:
> https://github.com/apache/kafka/releases/tag/3.3.0-rc2
>
> * Documentation:  https://kafka.apache.org/33/documentation.html
>
> * Protocol: https://kafka.apache.org/33/protocol.html
>
>
>
>
> Successful Jenkins builds to follow in a future update to this email.
>
>
> Thanks!
> David Arthur


Re: UnsupportedOperationException: this should not happen: timestamp() is not supported in standby tasks.

2022-08-23 Thread John Roesler
Hi Suresh,

Sorry for the trouble!

Are you able to provide the rest of the stack trace?

It shouldn’t be possible to call put() on a store in a standby task, so we need 
to see the stack frames that show what is calling it. 

Thanks,
John

On Tue, Aug 23, 2022, at 05:08, Suresh Rukmangathan wrote:
> Hi,
>
> We have a Kafka streams application with "num.standby.replicas" = 1 and
> with replication-factor of 2 for that topic. The application is crashing
> with the following stack trace.
>
>  java.lang.UnsupportedOperationException: this should not happen:
> timestamp() is not supported in standby tasks.\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.throwUnsupportedOperationExceptionIfStandby(ProcessorContextImpl.java:352)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:328)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:136)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:78)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:197)\n\n\tat
>  
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)\n\n\tat
>  
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:197)\n\n\tat
>
> Key Kafka streams application configuration details are as below:-
>
> {replication.factor=1, num.standby.replicas=1, 
> topology.optimization=all,
>  
> producer.partitioner.class=com.hpe.gravity.gravitycommon.utils.Murmur3Partitioner,
> max.request.size=1048576,  auto.offset.reset=earliest}
>
> Is this a Kafka library issue or something wrong with the application
> configuration?
>
> thanks,
> -sureshr


Re: [ANNOUNCE] New Kafka PMC Member: A. Sophie Blee-Goldman

2022-08-02 Thread John Roesler
Congratulations, Sophie!

-John

On Tue, Aug 2, 2022, at 06:40, Chris Egerton wrote:
> Congrats, Sophie!
>
> On Mon, Aug 1, 2022 at 9:21 PM Luke Chen  wrote:
>
>> Congrats Sophie! :)
>>
>> Luke
>>
>> On Tue, Aug 2, 2022 at 7:56 AM Adam Bellemare 
>> wrote:
>>
>> > Congratulations Sophie! I’m glad to see you made as a PMC member! Well
>> > earned.
>> >
>> > > On Aug 1, 2022, at 6:42 PM, Guozhang Wang  wrote:
>> > >
>> > > Hi everyone,
>> > >
>> > > I'd like to introduce our new Kafka PMC member, Sophie. She has been a
>> > > committer since Oct. 2020 and has been contributing to the community
>> > > consistently, especially around Kafka Streams and Kafka java consumer.
>> > She
>> > > has also presented about Kafka Streams at Kafka Summit London this
>> year.
>> > It
>> > > is my pleasure to announce that Sophie agreed to join the Kafka PMC.
>> > >
>> > > Congratulations, Sophie!
>> > >
>> > > -- Guozhang Wang, on behalf of Apache Kafka PMC
>> >
>>


Re: question: kafka stream Tumbling Window can't close when no producer sending message

2022-07-28 Thread John Roesler
Hello,

Yes, this is correct. There is a difference between what we call “stream time” 
and regular “wall-clock time”.

All the windowing operations need to be deterministic, otherwise your results 
would depend on when you run your program. For that reason, we have “stream 
time”, which takes its clock from the incoming records’ timestamps instead of 
the system clock.

Stopping the producer means you also stop sending new record timestamps, and 
you have effectively paused the clock. As a consequence, the open windows can’t 
close (nor can other temporal operations continue).

If you’re trying to write a test, my suggestion is to send a dummy record (to 
each partition) with a much later timestamp, which will cause stream time to 
advance, close out all your windows, and flush your results to the outputs.

Now that you know the terminology, you’ll be able to find documentation and 
presentations online about “stream time”. It’s a confusing but important aspect 
of stream processing. 

I hope this helps!
-John

On Thu, Jul 28, 2022, at 06:19, 张家帅 wrote:
> Dear kafka mangers: 
>   when I use kafka stream Tumbling Window, i found that if the 
> producer send messages to source topic continuously, the stream 
> TimeWindow could work as expected. but once i stop the producer, the 
> last window couldn't close, until the producer send next message. Is 
> this normally? 
> my code is in the attachment.
> Pease anser me when you have time, thanks a lot.
>
> Attachments:
> * KafkaStreamTests.java


Re: [ANNOUNCE] New Committer: Chris Egerton

2022-07-25 Thread John Roesler
Congratulations, Chris!!

-John

On Mon, Jul 25, 2022, at 20:22, Luke Chen wrote:
> Congratulations Chris! Well deserved!
>
> Luke
>
> On Tue, Jul 26, 2022 at 5:39 AM Anna McDonald 
> wrote:
>
>> Congratulations Chris! Time to Cellobrate!
>>
>> anna
>>
>> On Mon, Jul 25, 2022 at 4:23 PM Martin Gainty  wrote:
>>
>> > Congratulations Chris!
>> >
>> > martin~
>> > 
>> > From: Mickael Maison 
>> > Sent: Monday, July 25, 2022 12:25 PM
>> > To: dev ; Users 
>> > Subject: [ANNOUNCE] New Committer: Chris Egerton
>> >
>> > Hi all,
>> >
>> > The PMC for Apache Kafka has invited Chris Egerton as a committer, and
>> > we are excited to announce that he accepted!
>> >
>> > Chris has been contributing to Kafka since 2017. He has made over 80
>> > commits mostly around Kafka Connect. His most notable contributions
>> > include KIP-507: Securing Internal Connect REST Endpoints and KIP-618:
>> > Exactly-Once Support for Source Connectors.
>> >
>> > He has been an active participant in discussions and reviews on the
>> > mailing lists and on Github.
>> >
>> > Thanks for all of your contributions Chris. Congratulations!
>> >
>> > -- Mickael, on behalf of the Apache Kafka PMC
>> >
>>


Re: KStreams State Store - state.dir does not have .checkpoint file

2022-06-01 Thread John Roesler
Hi Neeraj,

Thanks for all that detail! Your expectation is correct. You should see the 
checkpoint files after a _clean_ shutdown, and then you should not see it 
bootstrap from the beginning of the changelog on the next startup.

How are you shutting down the application? You'll want to call 
KafkaStreams#stop and wait for it to complete before stopping the java process.

I hope this helps,
-John

On Tue, May 31, 2022, at 23:09, Neeraj Vaidya wrote:
> Hi All,
> I have a KStreams application running inside a Docker container which 
> uses a persistent key-value store. 
>
> I have configured state.dir with a value of /tmp/kafka-streams (which 
> is the default).
>
> When I start this container using "docker run", I mount 
> /tmp/kafka-streams to a directory on my host machine which is, say for 
> example, /mnt/storage/kafka-streams.
>
> My application.id is "myapp". I have 288 partitions in my input topic 
> which means my state store / changelog topic will also have that many 
> partitions. Accordingly, when start my Docker container, I see that 
> there a folder with the number of the partition such as 0_1, 
> 0_20_288 under /mnt/storage/kafka-streams/myapp/
>
> When I shutdown my application, I do not see any checkpoint file in any 
> of the partition directories.
>
> And when I restart my application, it starts fetching the records from 
> the changelog topic rather than reading from local disk. I suspect this 
> is because there is no .checkpoint file in any of the partition 
> directories. 
>
> This is what I see in the startup log. It seems to be bootstrapping the 
> entire state store from the changelog topic i.e. performing network I/O 
> rather than reading from what is on disk :
>
> "
> 2022-05-31T12:08:02.791 
> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
> o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread 
> [myapp-f6900c0a-50ca-43a0-8a4b-95eaa
> d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did 
> not find checkpoint offsets while stores are not empty, since under EOS 
> it has the risk of getting uncommitte
> d data in stores we have to treat it as a task corruption error and 
> wipe out the local state of task 0_170 before re-bootstrapping
> 2022-05-31T12:08:02.791 
> [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
> 9e5093-StreamThread-122] Detected the states of tasks [0_170] are 
> corrupted. Will close the task as dirty and re-create and bootstrap 
> from scratch.
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] 
> are corrupted and hence needs to be re-initialized
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
> at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> "
>
> 1) Should I expect to see a checkpoint file in each of the partition 
> directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my 
> application ?
>
> 2) Is this an issue because I am running my KStreams app inside a 
> docker container ? If there were permissions issues, then I would have 
> expected to see issues in creating the other files such as .lock or 
> rocksdb folder (and it's contents).
>
> My runtime environment is Docker 1.13.1 on RHEL 7.


Re: Subscribing to users@kafka.apache.org

2022-04-15 Thread John Roesler
Hi Lorcan,

Thanks for your interest!

The instructions for subscribing are available here: 
https://kafka.apache.org/contact

Thanks,
John

On Fri, Apr 15, 2022, at 11:28, Lorcan Cooke wrote:
> Hi,
>
>
> I would like to subscribe to users@kafka.apache.org please.
>
>
> Regards,
>
> —
> Lorcan


Re: [kafka-clients] [ANNOUNCE] Apache Kafka 3.0.1

2022-03-14 Thread John Roesler
Yes, thank you, Mickael!
-John

On Mon, 2022-03-14 at 12:19 +0100, Bruno Cadonna wrote:
> Thanks Mickael for driving this release!
> 
> Best,
> Bruno
> 
> On 14.03.22 11:42, Mickael Maison wrote:
> > The Apache Kafka community is pleased to announce the release for
> > Apache Kafka 3.0.1
> > 
> > Apache Kafka 3.0.1 is a bugfix release and 29 issues have been fixed
> > since 3.0.0.
> > 
> > All of the changes in this release can be found in the release notes:
> > https://www.apache.org/dist/kafka/3.0.1/RELEASE_NOTES.html
> > 
> > 
> > You can download the source and binary release (Scala 2.12 and 2.13) from:
> > https://kafka.apache.org/downloads#3.0.1
> > 
> > ---
> > 
> > 
> > Apache Kafka is a distributed streaming platform with four core APIs:
> > 
> > 
> > ** The Producer API allows an application to publish a stream of records to
> > one or more Kafka topics.
> > 
> > ** The Consumer API allows an application to subscribe to one or more
> > topics and process the stream of records produced to them.
> > 
> > ** The Streams API allows an application to act as a stream processor,
> > consuming an input stream from one or more topics and producing an
> > output stream to one or more output topics, effectively transforming the
> > input streams to output streams.
> > 
> > ** The Connector API allows building and running reusable producers or
> > consumers that connect Kafka topics to existing applications or data
> > systems. For example, a connector to a relational database might
> > capture every change to a table.
> > 
> > 
> > With these APIs, Kafka can be used for two broad classes of application:
> > 
> > ** Building real-time streaming data pipelines that reliably get data
> > between systems or applications.
> > 
> > ** Building real-time streaming applications that transform or react
> > to the streams of data.
> > 
> > 
> > Apache Kafka is in use at large and small companies worldwide, including
> > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> > Target, The New York Times, Uber, Yelp, and Zalando, among others.
> > 
> > A big thank you for the following 26 contributors to this release!
> > 
> > A. Sophie Blee-Goldman, Andras Katona, Bruno Cadonna, Chris Egerton,
> > Cong Ding, David Jacot, dengziming, Edoardo Comar, Ismael Juma, Jason
> > Gustafson, jiangyuan, Kevin Zhang, Konstantine Karantasis, Lee
> > Dongjin, Luke Chen, Marc Löhe, Matthias J. Sax, Michael Carter,
> > Mickael Maison, Oliver Hutchison, Philip Nee, Prateek Agarwal,
> > prince-mahajan, Rajini Sivaram, Randall Hauch, Walker Carlson
> > 
> > We welcome your help and feedback. For more information on how to
> > report problems, and to get involved, visit the project website at
> > https://kafka.apache.org/
> > 
> > Thank you!
> > 
> > 
> > Regards,
> > Mickael Maison
> > 



[ANNOUNCE] Apache Kafka 2.8.0

2021-04-19 Thread John Roesler
The Apache Kafka community is pleased to announce the
release for Apache Kafka 2.8.0

Kafka 2.8.0 includes a number of significant new features.
Here is a summary of some notable changes:

* Early access of replace ZooKeeper with a self-managed
quorum
* Add Describe Cluster API
* Support mutual TLS authentication on SASL_SSL listeners
* JSON request/response debug logs
* Limit broker connection creation rate
* Topic identifiers
* Expose task configurations in Connect REST API
* Update Streams FSM to clarify ERROR state meaning
* Extend StreamJoined to allow more store configs
* More convenient TopologyTestDriver construtors
* Introduce Kafka-Streams-specific uncaught exception
handler
* API to start and shut down Streams threads
* Improve TimeWindowedDeserializer and TimeWindowedSerde to
handle window size
* Improve timeouts and retries in Kafka Streams

All of the changes in this release can be found in the
release notes:
https://www.apache.org/dist/kafka/2.8.0/RELEASE_NOTES.html


You can download the source and binary release (Scala 2.12
and 2.13) from:
https://kafka.apache.org/downloads#2.8.0


---


Apache Kafka is a distributed streaming platform with four
core APIs:


** The Producer API allows an application to publish a
stream records to one or more Kafka topics.

** The Consumer API allows an application to subscribe to
one or more topics and process the stream of records
produced to them.

** The Streams API allows an application to act as a stream
processor, consuming an input stream from one or more topics
and producing an output stream to one or more output topics,
effectively transforming the input streams to output
streams.

** The Connector API allows building and running reusable
producers or consumers that connect Kafka topics to existing
applications or data systems. For example, a connector to a
relational database might capture every change to a table.


With these APIs, Kafka can be used for two broad classes of
application:

** Building real-time streaming data pipelines that reliably
get data between systems or applications.

** Building real-time streaming applications that transform
or react to the streams of data.


Apache Kafka is in use at large and small companies
worldwide, including Capital One, Goldman Sachs, ING,
LinkedIn, Netflix, Pinterest, Rabobank, Target, The New York
Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 128 contributors to this
release!

17hao, abc863377, Adem Efe Gencer, Alexander Iskuskov, Alok
Nikhil, Anastasia Vela, Andrew Lee, Andrey Bozhko, Andrey
Falko, Andy Coates, Andy Wilkinson, Ankit Kumar, APaMio,
Arjun Satish, ArunParthiban-ST, A. Sophie Blee-Goldman,
Attila Sasvari, Benoit Maggi, bertber, bill, Bill Bejeck,
Bob Barrett, Boyang Chen, Brajesh Kumar, Bruno Cadonna,
Cheng Tan, Chia-Ping Tsai, Chris Egerton, CHUN-HAO TANG,
Colin Patrick McCabe, Colin P. Mccabe, Cyrus Vafadari, David
Arthur, David Jacot, David Mao, dengziming, Dhruvil Shah,
Dima Reznik, Dongjoon Hyun, Dongxu Wang, Emre Hasegeli,
feyman2016, fml2, Gardner Vickers, Geordie, Govinda Sakhare,
Greg Harris, Guozhang Wang, Gwen Shapira, Hamza Slama,
high.lee, huxi, Igor Soarez, Ilya Ganelin, Ismael Juma, Ivan
Ponomarev, Ivan Yurchenko, jackyoh, James Cheng, James
Yuzawa, Jason Gustafson, Jesse Gorzinski, Jim Galasyn, John
Roesler, Jorge Esteban Quilcate Otoya, José Armando García
Sancio, Julien Chanaud, Julien Jean Paul Sirocchi, Justine
Olshan, Kengo Seki, Kowshik Prakasam, leah, Lee Dongjin,
Levani Kokhreidze, Lev Zemlyanov, Liju John, Lincong Li,
Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco Aurelio
Lotz, mathieu, Matthew Wong, Matthias J. Sax, Matthias
Merdes, Michael Bingham, Michael G. Noll, Mickael Maison,
Montyleo, mowczare, Nikolay, Nikolay Izhikov, Ning Zhang,
Nitesh Mor, Okada Haruki, panguncle, parafiend, Patrick
Dignan, Prateek Agarwal, Prithvi, Rajini Sivaram, Raman
Verma, Ramesh Krishnan M, Randall Hauch, Richard
Fussenegger, Rohan, Rohit Deshpande, Ron Dagostino, Samuel
Cantero, Sanket Fajage, Scott Hendricks, Shao Yang Hong,
ssugar, Stanislav Kozlovski, Stanislav Vodetskyi, tang7526,
Thorsten Hake, Tom Bentley, vamossagar12, Viktor Somogyi-
Vass, voffcheg109, Walker Carlson, wenbingshen, wycc,
xakassi, Xavier Léauté, Yilong Chang, zhangyue19921010

We welcome your help and feedback. For more information on
how to report problems, and to get involved, visit the
project website at https://kafka.apache.org/

Thank you!


Regards,

John Roesler



Re: [kafka-clients] Re: Subject: [VOTE] 2.8.0 RC1

2021-04-14 Thread John Roesler
Thanks for the feedback, all.

I am now closing this vote thread in favor of the vote on
2.8.0 RC2.

Thanks,
John

On Mon, 2021-04-12 at 21:24 -0400, Israel Ekpo wrote:
> No problem, I will assign in to you shortly.
> 
> https://issues.apache.org/jira/browse/KAFKA-12658
> 
> On Mon, Apr 12, 2021 at 8:47 PM John Roesler
>  wrote:
> > Good catch, Israel!
> > 
> > I’ll make sure that gets fixed. 
> > 
> > Thanks,
> > John
> > 
> > On Mon, Apr 12, 2021, at 19:30, Israel Ekpo wrote:
> > > I just noticed that with the latest release candidate,
> > the binaries from
> > > the Scala 2.13 and 2.12 tarballs are not finding the
> > class for the meta
> > > data shell
> > > 
> > > https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/
> > > 
> > > It looks like kafka-run-class.sh is not able to load
> > it.
> > > 
> > > Is this a known issue? Should I open an issue to track
> > it?
> > > 
> > > isekpo@MININT-5RPA920:/mnt/c/Users/isekpo/kafka_2.12-
> > 2.8.0$
> > > bin/kafka-metadata-shell.sh --help
> > > Error: Could not find or load main class
> > > org.apache.kafka.shell.MetadataShell
> > > Caused by: java.lang.ClassNotFoundException:
> > > org.apache.kafka.shell.MetadataShell
> > > 
> > > isekpo@MININT-5RPA920:/mnt/c/Users/isekpo/kafka_2.12-
> > 2.8.0$ cd
> > > ../kafka_2.13-2.8.0/
> > > 
> > > isekpo@MININT-5RPA920:/mnt/c/Users/isekpo/kafka_2.13-
> > 2.8.0$
> > > bin/kafka-metadata-shell.sh --help
> > > Error: Could not find or load main class
> > > org.apache.kafka.shell.MetadataShell
> > > Caused by: java.lang.ClassNotFoundException:
> > > org.apache.kafka.shell.MetadataShell
> > > 
> > > 
> > > 
> > > On Fri, Apr 9, 2021 at 4:52 PM Bill Bejeck
> >  wrote:
> > > 
> > > > Hi John,
> > > >
> > > > Thanks for running the 2.8.0 release!
> > > >
> > > > I've started to validate it and noticed the site-
> > docs haven't been
> > > > installed to
> > https://kafka.apache.org/28/documentation.html yet.
> > > >
> > > > Thanks again!
> > > >
> > > > -Bill
> > > >
> > > > On Tue, Apr 6, 2021 at 5:37 PM John Roesler
> >  wrote:
> > > >
> > > >> Hello Kafka users, developers and client-
> > developers,
> > > >>
> > > >> This is the second candidate for release of Apache
> > Kafka
> > > >> 2.8.0. This is a major release that includes many
> > new
> > > >> features, including:
> > > >>
> > > >> * Early-access release of replacing Zookeeper with
> > a self-
> > > >> managed quorum
> > > >> * Add Describe Cluster API
> > > >> * Support mutual TLS authentication on SASL_SSL
> > listeners
> > > >> * Ergonomic improvements to Streams
> > TopologyTestDriver
> > > >> * Logger API improvement to respect the hierarchy
> > > >> * Request/response trace logs are now JSON-
> > formatted
> > > >> * New API to add and remove Streams threads while
> > running
> > > >> * New REST API to expose Connect task
> > configurations
> > > >> * Fixed the TimeWindowDeserializer to be able to
> > deserialize
> > > >> keys outside of Streams (such as in the console
> > consumer)
> > > >> * Streams resilient improvement: new uncaught
> > exception
> > > >> handler
> > > >> * Streams resilience improvement: automatically
> > recover from
> > > >> transient timeout exceptions
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Release notes for the 2.8.0 release:
> > > >>
> >
> https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/RELEASE_NOTES.html
> > > >>
> > > >>
> > > >> *** Please download, test and vote by 6 April 2021
> > ***
> > > >>
> > > >> Kafka's KEYS file containing PGP keys we use to
> > sign the
> > > >> release:
> > > >> https://kafka.apache.org/KEYS
> > > >>
> > > >> * Release artifacts to be voted upon (source and
> > binary):
> > > >>
> > > >> https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/
> > > >>

[VOTE] 2.8.0 RC2

2021-04-14 Thread John Roesler
Hello Kafka users, developers and client-developers,

This is the third candidate for release of Apache Kafka
2.8.0.This is a major release that includes many new
features, including:

 * Early-access release of replacing Zookeeper with a
   self-managed quorum
 * Add Describe Cluster API
 * Support mutual TLS authentication on SASL_SSL listeners
 * Ergonomic improvements to Streams TopologyTestDriver
 * Logger API improvement to respect the hierarchy
 * Request/response trace logs are now JSON-formatted
 * New API to add and remove Streams threads while running
 * New REST API to expose Connect task configurations
 * Fixed the TimeWindowDeserializer to be able to
   deserialize
   keys outside of Streams (such as in the console consumer)
 * Streams resilient improvement: new uncaught exception
   handler
 * Streams resilience improvement: automatically recover
   from transient timeout exceptions

Release notes for the 2.8.0 release:
https://home.apache.org/~vvcephei/kafka-2.8.0-rc2/RELEASE_NOTES.html

*** Please download, test and vote by 19 April 2021 ***

Kafka's KEYS file containing PGP keys we use to sign the
release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~vvcephei/kafka-2.8.0-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~vvcephei/kafka-2.8.0-rc2/javadoc/

* Tag to be voted upon (off 2.8 branch) is the 2.8.0 tag:
https://github.com/apache/kafka/releases/tag/2.8.0-rc2

* Documentation:
https://kafka.apache.org/28/documentation.html

* Protocol:
https://kafka.apache.org/28/protocol.html

* Successful Jenkins builds for the 2.8 branch:
Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka/job/2.8/
(still flaky)

System tests:
 
https://jenkins.confluent.io/job/system-test-kafka/job/2.8/6
0/ 
 
http://confluent-kafka-2-8-system-test-results.s3-us-west-2.amazonaws.com/2021-04-14--001.1618407001--confluentinc--2.8--1b61272d45/report.html

/**

Thanks,
John




Re: [kafka-clients] Re: Subject: [VOTE] 2.8.0 RC1

2021-04-12 Thread John Roesler
Good catch, Israel!

I’ll make sure that gets fixed. 

Thanks,
John

On Mon, Apr 12, 2021, at 19:30, Israel Ekpo wrote:
> I just noticed that with the latest release candidate, the binaries from
> the Scala 2.13 and 2.12 tarballs are not finding the class for the meta
> data shell
> 
> https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/
> 
> It looks like kafka-run-class.sh is not able to load it.
> 
> Is this a known issue? Should I open an issue to track it?
> 
> isekpo@MININT-5RPA920:/mnt/c/Users/isekpo/kafka_2.12-2.8.0$
> bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class
> org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.shell.MetadataShell
> 
> isekpo@MININT-5RPA920:/mnt/c/Users/isekpo/kafka_2.12-2.8.0$ cd
> ../kafka_2.13-2.8.0/
> 
> isekpo@MININT-5RPA920:/mnt/c/Users/isekpo/kafka_2.13-2.8.0$
> bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class
> org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.shell.MetadataShell
> 
> 
> 
> On Fri, Apr 9, 2021 at 4:52 PM Bill Bejeck  wrote:
> 
> > Hi John,
> >
> > Thanks for running the 2.8.0 release!
> >
> > I've started to validate it and noticed the site-docs haven't been
> > installed to https://kafka.apache.org/28/documentation.html yet.
> >
> > Thanks again!
> >
> > -Bill
> >
> > On Tue, Apr 6, 2021 at 5:37 PM John Roesler  wrote:
> >
> >> Hello Kafka users, developers and client-developers,
> >>
> >> This is the second candidate for release of Apache Kafka
> >> 2.8.0. This is a major release that includes many new
> >> features, including:
> >>
> >> * Early-access release of replacing Zookeeper with a self-
> >> managed quorum
> >> * Add Describe Cluster API
> >> * Support mutual TLS authentication on SASL_SSL listeners
> >> * Ergonomic improvements to Streams TopologyTestDriver
> >> * Logger API improvement to respect the hierarchy
> >> * Request/response trace logs are now JSON-formatted
> >> * New API to add and remove Streams threads while running
> >> * New REST API to expose Connect task configurations
> >> * Fixed the TimeWindowDeserializer to be able to deserialize
> >> keys outside of Streams (such as in the console consumer)
> >> * Streams resilient improvement: new uncaught exception
> >> handler
> >> * Streams resilience improvement: automatically recover from
> >> transient timeout exceptions
> >>
> >>
> >>
> >>
> >> Release notes for the 2.8.0 release:
> >> https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/RELEASE_NOTES.html
> >>
> >>
> >> *** Please download, test and vote by 6 April 2021 ***
> >>
> >> Kafka's KEYS file containing PGP keys we use to sign the
> >> release:
> >> https://kafka.apache.org/KEYS
> >>
> >> * Release artifacts to be voted upon (source and binary):
> >>
> >> https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/
> >>
> >> * Maven artifacts to be voted upon:
> >> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >>
> >> * Javadoc:
> >>
> >> https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/javadoc/
> >>
> >> * Tag to be voted upon (off 2.8 branch) is the 2.8.0 tag:
> >>
> >> https://github.com/apache/kafka/releases/tag/2.8.0-rc1
> >>
> >> * Documentation:
> >> https://kafka.apache.org/28/documentation.html
> >>
> >> * Protocol:
> >> https://kafka.apache.org/28/protocol.html
> >>
> >>
> >> /**
> >>
> >> Thanks,
> >> John
> >>
> >>
> >>
> >> --
> > You received this message because you are subscribed to the Google Groups
> > "kafka-clients" group.
> > To unsubscribe from this group and stop receiving emails from it, send an
> > email to kafka-clients+unsubscr...@googlegroups.com.
> > To view this discussion on the web visit
> > https://groups.google.com/d/msgid/kafka-clients/CAF7WS%2BrK%3DWMyM3bamNoxa9L-onZbw6UnJFASx0ZO5ywzj38WvA%40mail.gmail.com
> > <https://groups.google.com/d/msgid/kafka-clients/CAF7WS%2BrK%3DWMyM3bamNoxa9L-onZbw6UnJFASx0ZO5ywzj38WvA%40mail.gmail.com?utm_medium=email_source=footer>
> > .
> >
>


Subject: [VOTE] 2.8.0 RC1

2021-04-06 Thread John Roesler
Hello Kafka users, developers and client-developers,

This is the second candidate for release of Apache Kafka
2.8.0. This is a major release that includes many new
features, including:

* Early-access release of replacing Zookeeper with a self-
managed quorum
* Add Describe Cluster API
* Support mutual TLS authentication on SASL_SSL listeners
* Ergonomic improvements to Streams TopologyTestDriver
* Logger API improvement to respect the hierarchy
* Request/response trace logs are now JSON-formatted
* New API to add and remove Streams threads while running
* New REST API to expose Connect task configurations
* Fixed the TimeWindowDeserializer to be able to deserialize
keys outside of Streams (such as in the console consumer)
* Streams resilient improvement: new uncaught exception
handler
* Streams resilience improvement: automatically recover from
transient timeout exceptions




Release notes for the 2.8.0 release:
https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/RELEASE_NOTES.html


*** Please download, test and vote by 6 April 2021 ***

Kafka's KEYS file containing PGP keys we use to sign the
release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):

https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:

https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/javadoc/

* Tag to be voted upon (off 2.8 branch) is the 2.8.0 tag:

https://github.com/apache/kafka/releases/tag/2.8.0-rc1

* Documentation:
https://kafka.apache.org/28/documentation.html

* Protocol:
https://kafka.apache.org/28/protocol.html


/**

Thanks,
John





Re: enforceRebalance using kafka admin APIs anf not consumer client API.

2021-04-01 Thread John Roesler
Hi Mazen,

This sounds like a good use case. If you’d like, you can start a KIP to add an 
enforceRebalance() method to the admin client interface. Feel free to ask here 
for any guidance on the KIP process itself.

Regarding your usage, you will actually have to call poll(). That is the point 
at which broker communication actually takes place. 

I hope this helps,
John

On Mon, Mar 29, 2021, at 01:44, Mazen Ezzeddine wrote:
> Dear all,
> I am using Kafka admin client where I am performing some computations 
> on the offsets (committed, last, …) to derive per-partition and 
> per-consumer information (on a certain consumer group) such as arrival 
> rate, lag, consumption rate etc… when certain conditions are met I want 
> to enforce rebalance on the consumer group being monitored.
> The issue is that the enforceRebalance is a consumer client API (and 
> not admin client API), so could you please provide information on the 
> most seamless way to enforce consumer group scenario in a situation 
> like the above.
> 
> Note that I tried workarounds like dynamically adding a new consumer 
> through Kubernetes client API  (as my cluster and clients are running 
> on kubernetes) to trigger a rebalance but preferably I don’t want to 
> use this workaround.
> 
> If I added to my admin client class a kafka consumer to be a member of 
> the consumer group without polling (calling poll) just for the purpose 
> of triggering a rebalance using the consumer API would that work. 
> Indeed I tried this scenario and it looks like no compilation or 
> runtime errors but I am not seeing in the logs that a group rebalance 
> has taken place ? am I missing something? any hint please?
> 
> Thank you.
> 
>


Re: [VOTE] 2.8.0 RC0

2021-03-30 Thread John Roesler
Hello again, all,

I just wanted to mention that I am aware of Justin's
concerns in the 2.6.2 thread:
https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E

I plan to make sure we address these concerns before the
actual 2.8.0 release, but wanted to get RC0 out asap for
testing.

Thank you,
John

On Tue, 2021-03-30 at 16:37 -0500, John Roesler wrote:
> Hello Kafka users, developers and client-developers,
> 
> This is the first candidate for release of Apache Kafka
> 2.8.0. This is a major release that includes many new
> features, including:
> 
> * Early-access release of replacing Zookeeper with a self-
> managed quorum
> * Add Describe Cluster API
> * Support mutual TLS authentication on SASL_SSL listeners
> * Ergonomic improvements to Streams TopologyTestDriver
> * Logger API improvement to respect the hierarchy
> * Request/response trace logs are now JSON-formatted
> * New API to add and remove Streams threads while running
> * New REST API to expose Connect task configurations
> * Fixed the TimeWindowDeserializer to be able to deserialize
> keys outside of Streams (such as in the console consumer)
> * Streams resilient improvement: new uncaught exception
> handler
> * Streams resilience improvement: automatically recover from
> transient timeout exceptions
> 
> 
> 
> Release notes for the 2.8.0 release:
> https://home.apache.org/~vvcephei/kafka-2.8.0-rc0/RELEASE_NOTES.html
> 
> *** Please download, test and vote by 6 April 2021 ***
> 
> Kafka's KEYS file containing PGP keys we use to sign the
> release:
> https://kafka.apache.org/KEYS
> 
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~vvcephei/kafka-2.8.0-rc0/
> 
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> 
> * Javadoc:
> https://home.apache.org/~vvcephei/kafka-2.8.0-rc0/javadoc/
> 
> * Tag to be voted upon (off 2.8 branch) is the 2.8.0 tag:
> https://github.com/apache/kafka/releases/tag/2.8.0-rc0
> 
> * Documentation:
> https://kafka.apache.org/28/documentation.html
> 
> * Protocol:
> https://kafka.apache.org/28/protocol.html
> 
> 
> /**
> 
> Thanks,
> John
> 
> 




[VOTE] 2.8.0 RC0

2021-03-30 Thread John Roesler
Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka
2.8.0. This is a major release that includes many new
features, including:

* Early-access release of replacing Zookeeper with a self-
managed quorum
* Add Describe Cluster API
* Support mutual TLS authentication on SASL_SSL listeners
* Ergonomic improvements to Streams TopologyTestDriver
* Logger API improvement to respect the hierarchy
* Request/response trace logs are now JSON-formatted
* New API to add and remove Streams threads while running
* New REST API to expose Connect task configurations
* Fixed the TimeWindowDeserializer to be able to deserialize
keys outside of Streams (such as in the console consumer)
* Streams resilient improvement: new uncaught exception
handler
* Streams resilience improvement: automatically recover from
transient timeout exceptions



Release notes for the 2.8.0 release:
https://home.apache.org/~vvcephei/kafka-2.8.0-rc0/RELEASE_NOTES.html

*** Please download, test and vote by 6 April 2021 ***

Kafka's KEYS file containing PGP keys we use to sign the
release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~vvcephei/kafka-2.8.0-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~vvcephei/kafka-2.8.0-rc0/javadoc/

* Tag to be voted upon (off 2.8 branch) is the 2.8.0 tag:
https://github.com/apache/kafka/releases/tag/2.8.0-rc0

* Documentation:
https://kafka.apache.org/28/documentation.html

* Protocol:
https://kafka.apache.org/28/protocol.html


/**

Thanks,
John




Re: Processor API in 2.7

2021-03-22 Thread John Roesler
Hi Ross,

Thanks for the feedback!

For some "context," the change you mention was:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API

In addition to the spec on that page, there are links to the
discussion and voting mailing list threads.

The primary motivation was to go from a prior state in which
there was absolutely no safety or checking possible
regarding which types a parent should pass to a child or a
child should accept from the parent. This was a practical
choice: there have been several subtle bugs in Streams DSL
features in the last several releases, in which some edge
case causes the feature's processors to forward a wrongly-
typed record downstream.

I was trying pretty hard actually _not_ to design a whole
new "V2" of the PAPI, since I really wanted to fix a design
flaw that had cost us hundreds of hours of effort, both as
maintainers and as users, and I wanted to fix it as fast as
possible. Sadly, even though I was trying to be as
incremental as possible, it still took a very long time to
get from proposal to implementation, and unfortunately, the
process of migrating the DSL's internal code is still not
complete. This seems to suggest that it was actually a good
choice not to bring in even more of a grand vision here, as
there would have been even more delays.

Ok, that's enough self-justification ;)


Your appraisal is correct. The new PAPI typing does not
handle the case of child nodes of different types, and if
that's your situation, you will indeed have to set the
outbound types to a supertype of all children's inbound
types (such as Object).

I like the spirit of your proposal, and I agree it would be
good to move in a direction of having good type safety for
your use case. If we can continue to evolve the API via
additions and deprecations, it will be less disruptive for
the overall community of users, but indeed for some kinds of
changes, a clean break is better.

It looks like your specific proposal could be achieved in
place, but it also seems like you have a pretty clear idea
of where you'd like to see the API go in the future.
Actually we have the 3.0 release coming up, so it would be a
favorable time for you to promote your vision.

To help everyone discuss both large and small proposals
unambiguously, we have a formal process for design reviews,
called KIPs:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

If you do want to move forward with a proposal, please feel
free to start drafting a KIP. If you need any help
navigating that process, please reach out to us on the dev
mailing list, or of course, we can continue to use this
thread.

I really appreciate your willingness to share your feedback
and ideas.

Thank you,
John



On Mon, 2021-03-22 at 14:46 +1100, Ross Black wrote:
> Hi,
> 
> I wanted to provide some feedback about the new API introduced in Kafka
> streaming 2.7 for adding a Processor to the Topology (using PAPI).
> 
> Prior to 2.7, the Processor was typed only by the input key values ,
> and the ProcessorContext was untyped.
> 
> In 2.7 ProcessorSupplier & Processor now have the additional params <...,
> KForward, VForward> and the ProcessorContext also has  VForward>.  The *only* use of this appears to be to provide some typing for
> the "forward" functions.
> One of my common use cases is where I have multiple child nodes each of
> which has different output .  To migrate to the new 2.7 API, seems to
> require adding <.., Object, Object> parameters to every type which looks
> noisy in the code and does nothing for improving type safety of the API.
> 
> I would like to ask why the new type parameters were added when they do not
> actually achieve typing for the multiple child use-case?
> 
> Personally, I would prefer new methods on the ProcessorContext that allow
> me to create a "typed" forwarder per child node.
> 
> e.g.
> 
> public interface ProcessorContext {
> 
> /** Default forwarder. */
>  Forwarder forwarder();
> 
> /** Forwarder for a specific child. */
>  Forwarder forwarder(final String childName);
> 
> }
> 
> 
> 
> public interface Forwarder {
> void forward(Record record);
> }
> 
> 
> With my suggested approach you at least get some typing per-topic (child
> node).  To me this also feels like it better represents the domain model of
> single stream input with multiple outputs.  Forwarder also feels like a
> real entity in the model (similar to Producer), rather than just being a
> method hacked into the context.
> 
> 
> To me, the new 2.7 API feels like a step backwards.  I also get the
> impression that the streaming API continues to break each release without a
> particular strong model / design behind it (particularly the sometimes
> jarring disconnect between PAPI and streaming API).
> Having learned from initial implementations, is it time to take a step back
> to design a "V2" streaming API which blends the existing PAPI + streaming
> API?
> I am curious 

Re: Kafka 2.7.0 processor API and streams-test-utils changes

2021-02-06 Thread John Roesler
Hello Upesh,

I’m sorry for the trouble. This was my feature, and my oversight. I will update 
the docs on Monday. 

The quick answer is that there is also a new api.MockProcessorContext, which is 
compatible with the new interface. That class also provides 
getStateStoreContext to use with the state stores: 
https://kafka.apache.org/27/javadoc//org/apache/kafka/streams/processor/api/MockProcessorContext.html#getStateStoreContext--

Here is an example of how to use it:

https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java#L57-L58

I hope this helps, and I’ll fix the docs ASAP. 

Thanks,
John

On Fri, Feb 5, 2021, at 17:33, Guozhang Wang wrote:
> Hello Upesh,
> 
> Thanks for the report! I think this is overlooked to update the
> documentation with the new 2.7.0 release. Could you file a JIRA (or even
> better, provide a PR with the JIRA :) to update the docs?
> 
> 
> Guozhang
> 
> On Thu, Feb 4, 2021 at 1:03 PM Upesh Desai  wrote:
> 
> > Hello,
> >
> >
> >
> > I recently upgraded our Kafka components to 2.7.0, and noticed the changes
> > to the processor API. Specifically, the additions of:
> >
> >
> >
> >- org.apache.kafka.streams.processor.api.Processor
> >- org.apache.kafka.streams.processor.api.ProcessorContext
> >
> >
> >
> > The old Topology.addProcessor() method has been deprecated, which is what
> > led me to finding the new classes. After porting our code to the updated
> > processor API, we noticed issues with the Processor unit tests, which had
> > been written follow this documentation exactly:
> >
> >
> >
> >
> > https://kafka.apache.org/27/documentation/streams/developer-guide/testing.html#unit-testing-processors
> >
> >
> >
> > However, it seems that the MockProcessorContext and possibly other test
> > suite classes have not been updated for the new API changes, such as the
> > following methods:
> >
> >
> >
> > store.init(context, store);
> >
> > context.register(store, null);
> >
> >
> >
> > Can someone point me in the right direction if this has indeed been
> > changed/fixed or need to raise an issue to have this updated in the next
> > release?
> >
> >
> >
> > Cheers!
> > 
> > Upesh Desai​
> > Senior Software Developer
> > *ude...@itrsgroup.com* 
> > *www.itrsgroup.com* 
> > Internet communications are not secure and therefore the ITRS Group does
> > not accept legal responsibility for the contents of this message. Any view
> > or opinions presented are solely those of the author and do not necessarily
> > represent those of the ITRS Group unless otherwise specifically stated.
> > [itrs.email.signature]
> >
> >
> > *Disclaimer*
> >
> > The information contained in this communication from the sender is
> > confidential. It is intended solely for use by the recipient and others
> > authorized to receive it. If you are not the recipient, you are hereby
> > notified that any disclosure, copying, distribution or taking action in
> > relation of the contents of this information is strictly prohibited and may
> > be unlawful.
> >
> > This email has been scanned for viruses and malware, and may have been
> > automatically archived by *Mimecast Ltd*, an innovator in Software as a
> > Service (SaaS) for business. Providing a *safer* and *more useful* place
> > for your human generated data. Specializing in; Security, archiving and
> > compliance.
> >
> 
> 
> -- 
> -- Guozhang
>


Re: Event Sourcing with Kafka Streams and processing order of a re-entrant pipeline

2021-01-31 Thread John Roesler
Hi David,

Thank you for the question.

If I can confirm, it looks like the "operations" topic is
the only input to the topology, and the topology reads the
"operations" topic joined with the "account" table and
generates a "movements" stream. It reads (and aggregates)
the "movements" stream to create the "account" table.

I think your concern is dead-on. The processing of incoming
records from either the "operations" topic or the
"movements" topic is synchronous, BUT the production of
messages to the "movements" topic and subsequent consumption
of those messages in the join is asynchronous. In other
words, you can indeed get "insufficient funds" in your
application.

Following your scenario, this can happen:

1. consume Operation(10)
1. join (balance is $0)
1. produce Movement(10)

2. consume Operation(10)
2. join (balance is $0)
2. produce Movement(10)

3. consume Movement(10) // the first one
3. update table to value $10

4. consume Operation(-20)
4. join (balance is $10) // only seen the first one
4. produce Movement(0, "insufficient funds)

5. consume Movement(10) // the second one (too late)
5. update table to value $20

6. consume Movement(0, "insufficient funds)
...

Other interleavings are also possible.

To get synchronous processing, what you want is a single
subtopology where all the data flows are internal (i.e.,
that re-entrant topic is the source of the race condition).

If you don't know about it already, you can print out your
topology and visualize it with
https://zz85.github.io/kafka-streams-viz/ .

Technically, you can actually model this as a single
aggregation:

operationStream
 .groupByKey
 .aggregate(Account(0)){ (_, operation, account) =>
  if (account.balance >= -operation.amount) {
   account.copy(balance=account.balance+operation.amount)
  } else {
   account.copy(error="Insufficient funds")
  }
 }
 .toStream.to(topicAccounts)

But if you want to decouple the "insufficient funds" error
handling from the account maintenence, you might look at a
ValueTransformer, in which you maintain the Accounts in a
key/value store and then forward an Either[Account, Error]
result, which you can then direct however you please.

Either way, maintaining the table and the balance checking
logic in a single operation guarantees you won't have race
conditions.

A final note: the reason your tests don't show the race
condition is that they are using TopologyTestDriver, which
synchronously propagates each individual input record all
the way through the topology. If you also set up a full
integration test, I suspect that you'll quickly see the race
condition surface.

I hope this helps,
-John


On Sun, 2021-01-31 at 11:37 +0100, Davide Icardi wrote:
> I'm working on a project where I want to use Kafka Streams for Event
> Sourcing.
> 
> General idea is that I have a "commands" topic/KStream, an "events"
> topic/KStream and a "snapshots" topic/KTable.
> Snapshots contains the current state of the entities. Commands are
> validated using the "snapshots" and transformed to "events".
> 
> Group EVENTS stream by key and aggregate them to a SNAPSHOTS table.
> Left join COMMANDS stream with the SNAPSHOTS table and output new
> EVENTS.
> 
> For example, to apply this pattern to a simple bank-account scenario I can
> have:
> - operations stream as "commands" (requests to deposit or withdraw an
> amount of money, eg. "deposit $10" => Operation(+10) )
> - movements stream as "events" (actual deposit or withdraw event, eg. "$10
> deposited" => Movement(+10) )
> - account table as a "snapshots" (account balance, eg. "$20 in account
> balance" => Account(20) )
> - account id is used as key for all topics and tables
> 
> The topology can be written like:
> 
> case class Operation(amount: Int)
> case class Movement(amount: Int, error: String = "")
> case class Account(balance: Int)
> 
> // events
> val movementsStream = streamBuilder.stream[String,
> Movement](Config.topicMovements)
> // snapshots
> val accountTable = movementsStream
>   .groupByKey
>   .aggregate(Account(0)){ (_, movement, account) =>
> account.copy(balance = account.balance + movement.amount)
>   }
> accountTable.toStream.to(Config.topicAccounts)
> // commands
> val operationsStream = streamBuilder.stream[String,
> Operation](Config.topicOperations)
> operationsStream
>   .leftJoin(accountTable) { (operation, accountOrNull) =>
> val account = Option(accountOrNull).getOrElse(Account(0))
> if (account.balance >= -operation.amount) {
>   Movement(operation.amount)
> } else {
>   Movement(0, error = "insufficient funds")
> }
>   }
>   .to(Config.topicMovements)
> 
> (see full code here:
> https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
> )
> 
> Now let's imagine a scenario where I deposit $ 10, then I deposit again $
> 10 and then I withdraw $ 20:
>  

Re: kafka-streams: interaction between max.poll.records and window expiration ?

2020-12-21 Thread John Roesler
Hi Mathieu,

I don’t think there would be any problem. Note that window expiry is computed 
against an internal clock called “stream time”, which is the max timestamp yet 
observed. This time is advanced per each record when that record is processed. 
There is a separate clock for each partition, so they will not affect each 
other.

I hope this helps,
John

On Sun, Dec 20, 2020, at 08:22, Mathieu D wrote:
> Hello there,
> 
> One of our input topics does not have so much traffic.
> Divided by the number of partitions, and given the default 'max.poll.records'
> setting (being 1000 if I understand the doc correctly), it could happen
> that fetching 1000 records at once, the event timestamps between the first
> and last record in the "batch" could be larger than some windows in my
> topology.
> 
> Could this have any impact on window expiration ?
> 
> Thanks
> Mathieu
>


Re: In Memory State Store

2020-12-21 Thread John Roesler
Hi Navneeth,

Yes, you are correct. I think there are some opportunities for improvement 
there, but there are also reasons for it to be serialized in the in-memory 
store. 

Off the top of my head, we need to serialize stored data anyway to send it to 
the changelog. Also, even though the store is in memory, there is still a cache 
(which helps reduce the update rate downstream); the cache stores serialized 
data because it’s the same cache covering all store types, and because it 
allows us to estimate and bound memory usage. Also, there are times when we 
need to determine if the current value is the same as the prior value; and 
comparing serialized forms is more reliable than using the equals() method. 
Round-tripping through serialization also helps to limit the scope of mutable 
object bugs, which I have seen in some user code. Finally, it simplifies the 
overall API and internals to have one kind of store to plug in, rather than to 
have some byte stores and some object stores.

All that aside, I’ve always felt that it could be a significant performance 
advantage to do what you suggest. If you feel passionate about it and want to 
do some experiments, I’d be happy to provide some guidance. Just let me know!

Thanks,
John

On Sun, Dec 20, 2020, at 14:27, Navneeth Krishnan wrote:
> Hi All,
> 
> I have a question about the inMemoryKeyValue store. I was under the
> assumption that in-memory stores would not serialize the objects but when I
> looked into the implementation I see InMemoryKeyValueStore uses a
> NavigableMap of bytes which indicates the user objects have to be
> serialized and stored.
> 
> Am I missing something? Wouldn't this cause more serialization overhead for
> storing something in memory?
> 
> In my case I have a punctuator which reads all the entries in the state
> store and forwards the data. When there are around 10k entries it takes
> about 400ms to complete. I was trying to optimize this problem. I use kryo
> serde and the objects are bigger in size (approx 500 bytes).
> 
> Regards,
> Navneeth
>


Re: Punctuate NPE

2020-12-15 Thread John Roesler
Hi Navneeth,

I'm sorry for the trouble.

Which version of Streams are you using? Also, this doesn't
look like the full stacktrace, since we can't see the NPE
itself. Can you share the whole thing?

Thanks,
-John


On Tue, 2020-12-15 at 00:30 -0800, Navneeth Krishnan wrote:
> Hi All,
> 
> I have a scheduled function that runs every 10 seconds and in some cases I
> see this NPE. Not sure how to debug this issue. Any pointers would really
> help. Thanks
> 
> context.schedule(this.scheduleInterval, PunctuationType.STREAM_TIME,
> this::flush);
> 
> 
> 2020-12-15 07:40:14.214
> [userapp-c2db617d-ed40-4c9a-a3b3-e9942c19d28a-StreamThread-4] ERROR
> Pipeline -
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [userapp-c2db617d-ed40-4c9a-a3b3-e9942c19d28a-StreamThread-4] task [12_16]
> Exception caught while punctuating processor 'Window_processor'
> at
> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:760)
> ~[kafka-streams-2.6.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)
> ~[kafka-streams-2.6.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateStreamTime(StreamTask.java:941)
> ~[kafka-streams-2.6.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:1066)
> ~[kafka-streams-2.6.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:707)
> ~[kafka-streams-2.6.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> ~[kafka-streams-2.6.0.jar!/:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> ~[kafka-streams-2.6.0.jar!/:?]
> 
> Thanks,
> Navneeth




Re: Apache Kafka Powered By : Proposition La Redoute

2020-12-15 Thread John Roesler
Hi Antoine,

Thanks for your message!

I couldn't see the logo; I think the mailing list software
doesn't transmit attachments.

Perhaps you could just send a pull request to add La Redoute
to
https://github.com/apache/kafka-site/edit/asf-site/powered-by.html
?

Feel free to reply here if you have any trouble with it.

Thanks,
-John

On Thu, 2020-12-10 at 20:47 +, CRASKE Antoine wrote:
> Hi there,
>  
> We would like to propose La Redoute to appear as users and
> “powered by” Apache Kafka
> :https://kafka.apache.org/powered-by
>  
> We have using for more than 2 years now, here the proposed
> text also for evaluation on your end, in attachment
> various logos formats,
>  
> “
> La Redoute, the digital platform for families, uses Kafka
> as a central nervous system to decouple its application
> through business events.
>  
> Thus, enabling a decentralized event-driven architecture
> bringing near-real-time data reporting, analytics and
> emerging AI-pipelines combining Kafka Connect, Kafka
> Streams and KSQL.
> “
>  
> Hoping this was the right email to send to,
>  
> Best,
> Antoine
>  
>  
> Esta mensagem, e quaisquer anexos (a "mensagem"), é
> dirigida unicamente aos seus destinatários e é
> confidencial. Se receber esta mensagem por engano, por
> favor elimine-a e notifique imediatamente o remetente.
> Qualquer uso que não esteja de acordo com seu propósito,
> qualquer disseminação ou revelação, total ou parcial, é
> proibido exceto com aprovação formal. A Internet não pode
> garantir a integridade desta mensagem. A La Redoute (e
> suas subsidiárias) não deverá (irá) portanto
> responsabilizar-se pela mensagem se esta for modificada.
> This message and any attachments are strictly confidential
> and intended solely for the addressees. If you have
> received this message in error please delete it and notify
> the sender immediately. Any dissemination or disclosure,
> either whole or partial, is prohibited except formal
> approval of the sender. Best efforts are made in order to
> keep this message free of virus, but integrity of this
> message is not guaranteed through the Internet : it could
> have been altered or falsified. The views or opinions
> presented here are solely of the author and do not
> necessarily represent those of the Enterprise. Its content
> cannot bind La Redoute.




Re: [VOTE] 2.7.0 RC5

2020-12-14 Thread John Roesler
Thanks for this release, Bill,

I ran though the quickstart (just the zk, broker, and
console clients part), verified the signatures, and also
built and ran the tests.

I'm +1 (binding).

Thanks,
-John

On Mon, 2020-12-14 at 14:58 -0800, Guozhang Wang wrote:
> I checked the docs and ran unit tests, no red flags found. +1.
> 
> On Fri, Dec 11, 2020 at 5:45 AM Bill Bejeck  wrote:
> 
> > Updated with link to successful Jenkins build.
> > 
> > * Successful Jenkins builds for the 2.7 branch:
> >  Unit/integration tests:
> > 
> > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-2.7-jdk8/detail/kafka-2.7-jdk8/78/
> > 
> > On Thu, Dec 10, 2020 at 5:17 PM Bill Bejeck  wrote:
> > 
> > > Hello Kafka users, developers and client-developers,
> > > 
> > > This is the sixth candidate for release of Apache Kafka 2.7.0.
> > > 
> > > * Configurable TCP connection timeout and improve the initial metadata
> > > fetch
> > > * Enforce broker-wide and per-listener connection creation rate (KIP-612,
> > > part 1)
> > > * Throttle Create Topic, Create Partition and Delete Topic Operations
> > > * Add TRACE-level end-to-end latency metrics to Streams
> > > * Add Broker-side SCRAM Config API
> > > * Support PEM format for SSL certificates and private key
> > > * Add RocksDB Memory Consumption to RocksDB Metrics
> > > * Add Sliding-Window support for Aggregations
> > > 
> > > This release also includes a few other features, 53 improvements, and 84
> > > bug fixes.
> > > 
> > > Release notes for the 2.7.0 release:
> > > https://home.apache.org/~bbejeck/kafka-2.7.0-rc5/RELEASE_NOTES.html
> > > 
> > > *** Please download, test and vote by Friday, December 18, 12 PM ET ***
> > > 
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > https://kafka.apache.org/KEYS
> > > 
> > > * Release artifacts to be voted upon (source and binary):
> > > https://home.apache.org/~bbejeck/kafka-2.7.0-rc5/
> > > 
> > > * Maven artifacts to be voted upon:
> > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > > 
> > > * Javadoc:
> > > https://home.apache.org/~bbejeck/kafka-2.7.0-rc5/javadoc/
> > > 
> > > * Tag to be voted upon (off 2.7 branch) is the 2.7.0 tag:
> > > https://github.com/apache/kafka/releases/tag/2.7.0-rc5
> > > 
> > > * Documentation:
> > > https://kafka.apache.org/27/documentation.html
> > > 
> > > * Protocol:
> > > https://kafka.apache.org/27/protocol.html
> > > 
> > > * Successful Jenkins builds for the 2.7 branch:
> > > Unit/integration tests: Link to follow
> > > 
> > > Thanks,
> > > Bill
> > > 
> > 
> 
> 




Re: Kafka Streams: Unexpected data loss

2020-11-20 Thread John Roesler
Hello Jeffrey,

I’m sorry for the trouble. I appreciate your diligence in tracking this down. 
In reading your description, nothing jumps out to me as problematic. I’m a bit 
at a loss as to what may have been the problem. 


>- Is there a realistic scenario (e.g. crash, rebalance) which you can
>think of where the offset has been committed, but the repartition was not?
I don’t think so. We wait for acks before committing, so it should be safe. 
Maybe double-check that the producer is set to acks=all?

>- Similarly, is there a realistic scenario (e.g. crash, rebalance) which
>you can think of where the offset has been committed, but the changelog was
>not?
Changelogs ought to be exactly the same mechanism as repartition topics. 

>- When caching the output of a KTable, are the corresponding offsets
>committed when the cache is flushed, or are they eligible to be committed
>as soon as the record is added to the cache?
The order of operations is to flush the caches, wait for acks, and then commit. 

>- Could it be a race-condition on the update of the state store? I.e.
>given 2 back-to-back messages for the same key, could an aggregation
>function handle both based on the old value?
This really shouldn’t be possible. Each task runs single-threaded.

Of course, that is all the intent of the code. There may be a bug that 
invalidates one of those responses. But I don’t think we know of other 
occasions of this happening.

It sounds like you don’t need general advice, but I feel compelled to offer it. 
The best I can think of is to keep a really close eye on the app. If you can 
catch the problem right away, you might be able to correlate it with the 
application logs, look in the topic partitions, etc. 

I hope this helps,
John

On Fri, Nov 20, 2020, at 14:39, Jeffrey Goderie wrote:
> Hi all,
> 
> We recently started using Kafka Streams and we encountered an unexpected
> issue with our Streams application. Using the following topology we ran
> into data loss:
> 
> Topologies:
>Sub-topology: 0
> Source: meteringpoints-source (topics:
> [serving.meteringpoints.mad-meteringpoint])
>   --> meteringpoints
> Processor: meteringpoints (stores: [meteringpoints])
>   --> KTABLE-SELECT-02
>   <-- meteringpoints-source
> Processor: KTABLE-SELECT-02 (stores: [])
>   --> KSTREAM-SINK-03
>   <-- meteringpoints
> Sink: KSTREAM-SINK-03 (topic:
> meteringpointsbyzipcode-repartition)
>   <-- KTABLE-SELECT-02
> 
>   Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics:
> [meteringpointsbyzipcode-repartition])
>   --> KTABLE-AGGREGATE-05
> Processor: KTABLE-AGGREGATE-05 (stores:
> [meteringpointsbyzipcode])
>   --> none
>   <-- KSTREAM-SOURCE-04
> 
> The topology is optimized, so the 'meteringpoints' store does not have an
> additional changelog. The 'meteringpointsbyzipcode' store does have one.
> 
> During the aggregation (KTABLE-AGGREGATE-05) we build up a set of
> objects that we encountered for that specific key. Upon inspection we
> noticed that some of our keys did not have all of the expected objects in
> their associated value.
> 
> Configuration-wise: our cluster consists of 3 brokers, and the topics
> (regular and internal) are replicated over all of them. We don't have EOS
> enabled, as our aggregation is idempotent. Our consumers' isolation level
> is 'read_uncommited', which we thought was irrelevant as 'at_least_once'
> delivery doesn't seem to use Kafka transactions. The amount of consumers is
> equal to the amount of partitions in each topic, so each consumer deals
> with a single partition for each topic.
> 
> Unfortunately, both the repartition topic and the changelog topic were
> wiped before we were able to investigate what caused the issue. Because of
> this we are unable to determine whether the problem originated in the
> changelog or the repartition topic. Resetting the application (and its
> offsets) caused all data to be reprocessed after which the issue was gone.
> We tried reproducing the erroneous scenario, but have not yet succeeded, up
> mostly because we don't know what events caused it in the first place.
> 
> Since we are seemingly unable to reproduce the behaviour, and didn't find
> any recent accounts of similar problems, we decided to look into the Kafka
> Streams source code to determine how things function. While insightful, it
> didn't help in determining the cause. As such, we were wondering whether
> you could aid us by providing an answer to some of the following questions?
> 
>- Is there a realistic scenario (e.g. crash, rebalance) which you can
>think of where the offset has been committed, but the repartition was not?
>- Similarly, is there a realistic scenario (e.g. crash, rebalance) which
>you can think of where the offset has been committed, but the changelog was
>not?
>- 

Re: kafka-streams / window expiration because of shuffling

2020-11-20 Thread John Roesler
Hi Mathieu,

Ah, that is unfortunate. I believe your analysis is correct. In general, we 
have no good solution to the problem of upstream tasks moving ahead of each 
other and causing disorder in the repartition topics. Guozhang has done a 
substantial amount of thinking on this subject, though, and has some ideas for 
how we can improve the behavior. 

However, your situation is a special case, since the data actually doesn’t need 
to be shuffled. Ideally, there would be a way to say “I assert that the 
partitioning doesn’t need to change despite this key change”. That actually 
seems like a good feature for the repartition operator.

In the absence of that feature, I think you’re on the right track. If you 
specify a partitioner that produces exactly the same partitioning as the 
source, you _should_ be able to avoid any shuffling, although there will still 
be a repartition topic there. 

I hope this helps,
John

On Fri, Nov 20, 2020, at 04:14, Mathieu D wrote:
> Hello there,
> 
> We're processing IOT device data, each device sending several metrics.
> 
> When we upgrade our streams app, we set a brand new 'application.id' to
> reprocess a bit of past data, to warm up the state stores and aggregations
> to make sure all outputs will be valid. Downstream is designed for "at
> least once" so no problem with this bit of reprocessing.
> 
> When this restart+reprocessing occurs, we observe a peak of "Skipping
> record for expired window" / "Skipping record for expired segment" warning
> in logs.
> 
> My understanding so far is this:
> - a part of our topology is keyed by deviceId.
> - during the reprocessing, some tasks are moving faster for some
> partitions, which means there's a substantial difference between the
> various stream-times across tasks
> - at some point in the topology, we re-key the data by (deviceId, metric)
> for "group by metric" aggregations
> - this shuffles the data:  deviceId1 was in partition 1 with eventTime1,
> deviceId2 was in partition 2 with eventTime2, and now by the magic of
> hashing a (device,metric) key, they are pushed together in the same
> partitionX. If eventTime2 is far ahead of eventTime1, then all windows
> will  expire at once.
> 
> Is this analysis correct ?
> 
> Then, what's the proper way to avoid this ? Manually do a .repartition()
> with a custom partitioner after each .selectKey((device, metric)), and
> before going through aggregations ?
> 
> Any other advice ?
> 
> Thanks for your insights
> 
> Mathieu
>


Re: Kafka Streams: SessionWindowedSerde Vs TimeWindowedSerde. Ambiguous implicit values

2020-11-19 Thread John Roesler
Oh, nice. Thanks, Daniel!

That’s much nicer than my ham-handed approach. 

Thanks,
John

On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
> Hope this helps, I tried copying your code into a sample application. I got
> it to compile with the implicits all resolving. I think the trick was there
> were two implementations for Windowing Serdes.  You just need to block one
> from the imports.  See if that fits with what you are doing.  Oh also, I
> noticed that the types were not resolving when calling builder.stream, so I
> put [String, String] in the builder.  Here is a gist, which formats better.
> 
> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
> 
> import java.time.Duration
> import java.util.Properties
> 
> import org.apache.kafka.common.config.Config
> import org.apache.kafka.streams.Topology
> import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
> import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
> Materialized, Produced}
> import org.apache.kafka.streams.scala.{ByteArraySessionStore, StreamsBuilder}
> 
> class SampleStream {
>   def createTopology(conf: Config, properties: Properties): Topology = {
> 
> implicit val produced: Produced[Windowed[String], Long] =
>   Produced.`with`[Windowed[String], Long]
> 
> implicit val grouped: Grouped[String, String] =
>   Grouped.`with`[String, String]
> 
> implicit val consumed: Consumed[String, String] =
>   Consumed.`with`[String, String]
> 
> implicit val materialized: Materialized[String, Long,
> ByteArraySessionStore] = Materialized.`with`[String, Long,
> ByteArraySessionStore]
> 
> val builder: StreamsBuilder = new StreamsBuilder()
> 
> builder
>   .stream[String, String]("streams-plaintext-input")
>   .groupBy((_, word) => word)
>   .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
>   .count()
>   .toStream
>   .to("streams-pipe-output")
> 
> builder.build()
>   }
> }
> 
> 
> 
> 
> On Thu, Nov 19, 2020 at 7:24 AM John Roesler  wrote:
> 
> > Hi Eric,
> >
> > Sure thing. Assuming the definition of ‘produced’ you had tried in your
> > code, it’s just:
> >
> > ...
> > .toStream.to("streams-pipe-output")(produced)
> >
> > As far as the json serde goes, I think that I wrote an example of using
> > Jackson to implement a serde in Confluent’s kafka-streams-examples repo.
> > I’m not sure what other/better examples
> > might be out there.
> >
> > Hope this helps,
> > John
> >
> > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> > > Not sure what you mean by "pass it explicitly". The definition of 'to' is
> > > given below. Can we pass it explicitly in this case. If yes, can you
> > please
> > > show me how?
> > >
> > > def to(topic: String)(implicit produced: Produced[K, V]): Unit =
> > >   inner.to(topic, produced)
> > >
> > >
> > > Also not sure how to use a self documenting format like JSON. Any
> > > examples to share?
> > >
> > >
> > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler 
> > wrote:
> > >
> > > > Hi Eric,
> > > >
> > > > Ah, that’s a bummer. The correct serde is the session windowed serde,
> > as I
> > > > can see you know. I’m afraid I’m a bit rusty on implicit resolution
> > rules,
> > > > so I can’t be much help there.
> > > >
> > > > But my general recommendation for implicits is that when things get
> > weird,
> > > > just don’t use them at all. For example, you can just explicitly pass
> > the
> > > > Produced in the second arg list of ‘to’.
> > > >
> > > > One other tip is that the serialized form produced by those serdes is
> > kind
> > > > of specialized and might not be the most convenient for your use. If
> > this
> > > > is just a POC, if suggest mapping the keys to strings, so they are
> > > > human-readable. If this is a production use case, then you might want
> > to
> > > > use a more self-documenting format like JSON or AVRO. Just my two
> > cents.
> > > >
> > > > I hope this helps!
> > > > -John
> > > >
> > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > > > > I keep getting '*ambiguous implicit values*' message in the following
> > > > code.
> > &g

Re: Kafka Streams: SessionWindowedSerde Vs TimeWindowedSerde. Ambiguous implicit values

2020-11-19 Thread John Roesler
Hi Eric,

Sure thing. Assuming the definition of ‘produced’ you had tried in your code, 
it’s just:

...
.toStream.to("streams-pipe-output")(produced)

As far as the json serde goes, I think that I wrote an example of using Jackson 
to implement a serde in Confluent’s kafka-streams-examples repo. I’m not sure 
what other/better examples
might be out there. 

Hope this helps,
John

On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> Not sure what you mean by "pass it explicitly". The definition of 'to' is
> given below. Can we pass it explicitly in this case. If yes, can you please
> show me how?
> 
> def to(topic: String)(implicit produced: Produced[K, V]): Unit =
>   inner.to(topic, produced)
> 
> 
> Also not sure how to use a self documenting format like JSON. Any
> examples to share?
> 
> 
> On Wed, Nov 18, 2020 at 5:14 PM John Roesler  wrote:
> 
> > Hi Eric,
> >
> > Ah, that’s a bummer. The correct serde is the session windowed serde, as I
> > can see you know. I’m afraid I’m a bit rusty on implicit resolution rules,
> > so I can’t be much help there.
> >
> > But my general recommendation for implicits is that when things get weird,
> > just don’t use them at all. For example, you can just explicitly pass the
> > Produced in the second arg list of ‘to’.
> >
> > One other tip is that the serialized form produced by those serdes is kind
> > of specialized and might not be the most convenient for your use. If this
> > is just a POC, if suggest mapping the keys to strings, so they are
> > human-readable. If this is a production use case, then you might want to
> > use a more self-documenting format like JSON or AVRO. Just my two cents.
> >
> > I hope this helps!
> > -John
> >
> > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > > I keep getting '*ambiguous implicit values*' message in the following
> > code.
> > > I tried several things (as can be seen from a couple of lines I've
> > > commented out). Any ideas on how to fix this? This is in *Scala*.
> > >
> > >  def createTopology(conf: Config, properties: Properties): Topology =
> > > {//implicit val sessionSerde =
> > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//implicit val
> > > produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
> > > implicit val produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[Windowed[String], Long]
> > > implicit val consumed: Consumed[String, String] =
> > > Consumed.`with`[String, String]
> > >
> > > val builder: StreamsBuilder = new StreamsBuilder()
> > > builder.stream("streams-plaintext-input")
> > > .groupBy((_, word) => word)
> > > .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
> > > .count()
> > > .toStream.to("streams-pipe-output")
> > >
> > > builder.build()
> > >
> > >   }
> > >
> > > *Compiler Errors:*
> > >
> > > Error:(52, 78) ambiguous implicit values:
> > >  both method timeWindowedSerde in object Serdes of type [T](implicit
> > > tSerde:
> > >
> > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
> > >  and method sessionWindowedSerde in object Serdes of type [T](implicit
> > > tSerde:
> > >
> > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
> > >  match expected type
> > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > implicit val produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[Windowed[String], Long]
> > > Error:(52, 78) could not find implicit value for parameter keySerde:
> > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > implicit val produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[Windowed[String], Long]
> > > Error:(52, 78) not enough arguments for method with: (implicit
> > > keySerde:
> > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> > > implicit valueSerde:
> > >
> > org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> > > value parameters keySerde, valueSerde.
> > > implicit val produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[Windowed[String], Long]
> > >
> >
>


Re: Kafka Streams: SessionWindowedSerde Vs TimeWindowedSerde. Ambiguous implicit values

2020-11-18 Thread John Roesler
Hi Eric,

Ah, that’s a bummer. The correct serde is the session windowed serde, as I can 
see you know. I’m afraid I’m a bit rusty on implicit resolution rules, so I 
can’t be much help there. 

But my general recommendation for implicits is that when things get weird, just 
don’t use them at all. For example, you can just explicitly pass the Produced 
in the second arg list of ‘to’. 

One other tip is that the serialized form produced by those serdes is kind of 
specialized and might not be the most convenient for your use. If this is just 
a POC, if suggest mapping the keys to strings, so they are human-readable. If 
this is a production use case, then you might want to use a more 
self-documenting format like JSON or AVRO. Just my two cents. 

I hope this helps!
-John 

On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> I keep getting '*ambiguous implicit values*' message in the following code.
> I tried several things (as can be seen from a couple of lines I've
> commented out). Any ideas on how to fix this? This is in *Scala*.
> 
>  def createTopology(conf: Config, properties: Properties): Topology =
> {//implicit val sessionSerde =
> Serde[WindowedSerdes.SessionWindowedSerde[String]]//implicit val
> produced: Produced[Windowed[String], Long] =
> Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
> implicit val produced: Produced[Windowed[String], Long] =
> Produced.`with`[Windowed[String], Long]
> implicit val consumed: Consumed[String, String] =
> Consumed.`with`[String, String]
> 
> val builder: StreamsBuilder = new StreamsBuilder()
> builder.stream("streams-plaintext-input")
> .groupBy((_, word) => word)
> .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
> .count()
> .toStream.to("streams-pipe-output")
> 
> builder.build()
> 
>   }
> 
> *Compiler Errors:*
> 
> Error:(52, 78) ambiguous implicit values:
>  both method timeWindowedSerde in object Serdes of type [T](implicit
> tSerde: 
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
>  and method sessionWindowedSerde in object Serdes of type [T](implicit
> tSerde: 
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
>  match expected type
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> implicit val produced: Produced[Windowed[String], Long] =
> Produced.`with`[Windowed[String], Long]
> Error:(52, 78) could not find implicit value for parameter keySerde:
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> implicit val produced: Produced[Windowed[String], Long] =
> Produced.`with`[Windowed[String], Long]
> Error:(52, 78) not enough arguments for method with: (implicit
> keySerde: 
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> implicit valueSerde:
> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> value parameters keySerde, valueSerde.
> implicit val produced: Produced[Windowed[String], Long] =
> Produced.`with`[Windowed[String], Long]
>


Re: StreamBuilder#stream multiple topic and consumed api

2020-11-13 Thread John Roesler
Hi Uirco,

The method that doesn’t take Consumed will fall back to the configured “default 
serdes”. If you don’t have that confit set, it will just keep them as byte 
arrays, which will probably give you an exception at runtime. You’ll probably 
want to use the Consumed argument to set your serdes. 

I’m assuming from your question that the serialized data in your two topics 
have different formats, but that your serdes for them produce the same result 
type. That is, the resulting stream would have just one type in it.

I think you have two options, you can either create a “wrapper deserializer” 
that checks the input topic for each record and then delegates to the 
appropriate deserializer, or you can create two different streams for the two 
topics and then use the ‘merge’ operator to combine them into one stream.

Offhand, I don’t think there would be much of a performance difference among 
any of the options, so I’d suggest going for the approach that looks the 
cleanest to you.

Regarding the last question, I’m not aware of any plans, but if you think that 
would be a better API, you are welcome to propose it in a KIP!

I hope this helps,
John

On Thu, Nov 12, 2020, at 22:00, Uirco Aoi wrote:
> have anyone used public synchronized  KStream stream(final 
> Collection topics)?
> my use case is I have one stream but with two source input using 
> different serde.
> looks like the best way is to use this stream API, but I have some 
> concerns
> 1. currently the only API I can use is 
> public synchronized  KStream stream(final 
> Collection topics)
> does it means after I got it, I have to deserialize the data myself?
> and will it slow down the performance, compare to using it with 
> consumed.with ?
> 2. will it in the future support public synchronized  KStream V> stream(final Collection> )
> class TopicsWithConsumed pair{
> final String topic
> final Consumed consumed
> }
>


Re: started getting TopologyException: Invalid topology after moving to streams-2.5.1

2020-10-05 Thread John Roesler
Hi Pushkar,

Sorry for the trouble. Can you share your config and
topology description?

If I read your error message correctly, it says that your
app is configured with no source topics and no threads. Is
that accurate?

Thanks,
-John

On Mon, 2020-10-05 at 15:04 +0530, Pushkar Deole wrote:
> Hi All,
> 
> After moving to kafka-streams-2.5.1 version, one of our services started
> failing with below exception. Any idea what this is about and why it was
> passing with 2.5.0? Any changes made in 2.5.1 that is breaking this?
> 
> Exception in thread "main"
> org.springframework.context.ApplicationContextException: Failed to start
> bean 'defaultKafkaStreamsBuilder'; nested exception is
> org.springframework.kafka.KafkaException: Could not start stream: ; nested
> exception is org.apache.kafka.streams.errors.TopologyException: Invalid
> topology: Topology has no stream threads and no global threads, must
> subscribe to at least one source topic or global table.
> 
> at org.springframework.context.support.DefaultLifecycleProcessor.doStart(
> DefaultLifecycleProcessor.java:185
> ;)
> 
> at org.springframework.context.support.DefaultLifecycleProcessor.access$200(
> DefaultLifecycleProcessor.java:53
> ;)
> 
> at org.springframework.context.support
> .DefaultLifecycleProcessor$LifecycleGroup.start(
> DefaultLifecycleProcessor.java:360
> ;)
> 
> at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(
> DefaultLifecycleProcessor.java:158
> ;)
> 
> at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(
> DefaultLifecycleProcessor.java:122
> ;)
> 
> at org.springframework.context.support
> .AbstractApplicationContext.finishRefresh(
> AbstractApplicationContext.java:895
> ;)
> 
> at org.springframework.context.support.AbstractApplicationContext.refresh(
> AbstractApplicationContext.java:554
> ;)
> 
> at
> org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(
> ServletWebServerApplicationContext.java:143
> ;)
> 
> at org.springframework.boot.SpringApplication.refresh(
> SpringApplication.java:758 ;)
> 
> at org.springframework.boot.SpringApplication.refresh(
> SpringApplication.java:750 ;)
> 
> at org.springframework.boot.SpringApplication.refreshContext(
> SpringApplication.java:397 ;)
> 
> at org.springframework.boot.SpringApplication.run
> (
> SpringApplication.java:315 ;)
> 
> at org.springframework.boot.SpringApplication.run
> (
> SpringApplication.java:1237 ;)
> 
> at org.springframework.boot.SpringApplication.run
> (
> SpringApplication.java:1226 ;)
> 
> at com.avaya.analytics.AnalyticsStreamsDataPublisherApplication.main(
> AnalyticsStreamsDataPublisherApplication.java:31
> ;)
> 
> Caused by: org.springframework.kafka.KafkaException: Could not start
> stream: ; nested exception is
> org.apache.kafka.streams.errors.TopologyException: Invalid topology:
> Topology has no stream threads and no global threads, must subscribe to at
> least one source topic or global table.
> 
> at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(
> StreamsBuilderFactoryBean.java:326
> ;)
> 
> at org.springframework.context.support.DefaultLifecycleProcessor.doStart(
> DefaultLifecycleProcessor.java:182
> ;)
> 
> ... 14 more
> 
> Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid
> topology: Topology has no stream threads and no global threads, must
> subscribe to at least one source topic or global table.
> 
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:728
> ;)
> 
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:587
> ;)
> 
> at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(
> StreamsBuilderFactoryBean.java:311
> ;)
> 
> ... 15 more



Re: Kafka stream error - Consumer is not subscribed to any topics or assigned any partitions

2020-09-14 Thread John Roesler
Hi Pushkar,

I'd recommend always keeping Streams and the Clients at the
same version, since we build, test, and release them
together. FWIW, I think there were some bugfixes for the
clients in 2.5.1 anyway.

Thanks,
-John

On Mon, 2020-09-14 at 20:08 +0530, Pushkar Deole wrote:
> Sophie, one more question: will just upgrading kafka-streams jar to 2.5.1
> will work or we need to other jars also to be upgraded to 2.5.1 e.g.
> kafka-clients etc. ?
> 
> On Mon, Sep 14, 2020 at 7:16 PM Pushkar Deole  wrote:
> 
> > Thanks Sophie... if we are just creating a global state store
> > (GlobalKTable for instance) from a topic, then that is what you are calling
> > as global-only topology. In our application that is what we are doing and
> > there is no source topic for the stream to process data from, i mean there
> > is however it is done through a consumer-producer kind of design and not
> > through stream topology.
> > 
> > On Fri, Sep 11, 2020 at 10:58 PM Sophie Blee-Goldman 
> > wrote:
> > 
> > > You should upgrade to 2.5.1, it contains a fix for this.
> > > 
> > > Technically the "fix" is just to automatically set the num.stream.threads
> > > to 0
> > > when a global-only topology is detected, so setting this manually would
> > > accomplish the same thing. But the fix also includes a tweak of the
> > > KafkaStreams state machine to make sure it reaches the RUNNING state
> > > even with no stream threads. So if you use a state listener, you'll want
> > > to
> > > use 2.5.1
> > > 
> > > It's always a good idea to upgrade when a new bugfix version is released
> > > anyway
> > > 
> > > On Fri, Sep 11, 2020 at 5:15 AM Pushkar Deole 
> > > wrote:
> > > 
> > > > Hi All,
> > > > 
> > > > I upgraded from Kafka streams 2.4 to 2.5.0 and one of the applications
> > > > suddenly stopped working with the error message:
> > > > 
> > > > Exception in thread
> > > > "DsiApplication-0fcde033-dab2-431c-9d82-76e85fcb4c91-StreamThread-1"
> > > > java.lang.IllegalStateException: Consumer is not subscribed to any
> > > topics
> > > > or assigned any partitions
> > > > at
> > > > 
> > > > 
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1228)
> > > > at
> > > > 
> > > > 
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
> > > > at
> > > > 
> > > > 
> > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
> > > > at
> > > > 
> > > > 
> > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
> > > > at
> > > > 
> > > > 
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
> > > > at
> > > > 
> > > > 
> > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> > > > This application uses streams just to create a global state store from a
> > > > topic in order to create a global state store as a cache for static data
> > > > across application instances and the stream doesn't consume from any
> > > input
> > > > topic. Came across following thread on stackoverflow
> > > > 
> > > > 
> > > https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic
> > > > Matthias, I see you have answered some queries there, so would like to
> > > > confirm if setting num.stream.threads to 0 will solve this issue?
> > > > 



Re: Handle exception in kafka stream

2020-09-01 Thread John Roesler
Hi Deepak,

It sounds like you're saying that the exception handler is
correctly indicating that Streams should "Continue", and
that if you stop the app after handling an exceptional
record but before the next commit, Streams re-processes the
record?

If that's what you're seeing, then it's how the system is
designed to operate. We don't commit after every record
because the overhead would be enormous. If you can't
tolerate seeing duplicates in your error file, then it
sounds like the simplest thing for you would be to maintain
an index of the records you have already saved off so that
you can gracefully handle re-processing. E.g., you might
have a separate file per topic-partition that you update
after appending to your error log to indicate the highest
offset you've handled. Then, you can read it from the
exception handler to see if the record you're handling is
already logged. Just an idea.

I hope this helps,
-John

On Tue, 2020-09-01 at 16:36 +0530, Deepak Raghav wrote:
> Hi Team
> 
> Just a reminder.
> Can you please help me with this?
> 
> Regards and Thanks
> Deepak Raghav
> 
> 
> 
> On Tue, Sep 1, 2020 at 1:44 PM Deepak Raghav 
> wrote:
> 
> > Hi Team
> > 
> > I have created a CustomExceptionHandler class by
> > implementing DeserializationExceptionHandler interface to handle the
> > exception during deserialization time.
> > 
> > But the problem with this approach is that if there is some exception
> > raised for some record and after that stream is stopped and
> > restarted again, it reads those bad messages again.
> > 
> > I am storing those bad messages in some file in the filesystem and with
> > this approach, duplicate messages are getting appended in the file when the
> > stream is started since those bad message's offset are not getting
> > increased.
> > 
> > Please let me know if I missed anything.
> > 
> > Regards and Thanks
> > Deepak Raghav
> > 
> > 



Re: JNI linker issue on ARM (Raspberry PI)

2020-08-24 Thread John Roesler
Ah, yes, before messing with rocksjni. As Sophie said, you'll probably just 
want to try using 2.6.0. It should work "out of the box".

Thanks,
-John

On Mon, Aug 24, 2020, at 21:08, Steve Jones wrote:
> Version: *kafka_2.12-2.5.0*
> 
> RocksDB compiles fine, Kafka compiles fine, I'm just trying to work out
> where the linking is done and if that can be changed to a static rather
> than a dynamic link.  Any hints appreciated, and I'll document out what
> I've found.   Its very stable on the Raspberry Pi as a broker, and for
> streams if the streams is run on another machine.
> 
> Steve
> 
> On Mon, 24 Aug 2020 at 17:49, Sophie Blee-Goldman 
> wrote:
> 
> > Yeah, if you weren't already running it, try upgrading Streams to 2.6. They
> > recently
> > added support for "all" platforms to rocksdb and we updated the dependency
> > to
> > get this fix in 2.6. See KAFKA-9225
> > <https://issues.apache.org/jira/browse/KAFKA-9225>
> >
> > If you already were running 2.6, then, that's unfortunate. You might have
> > some luck
> > asking the rocksdb folks if all else fails
> >
> > On Mon, Aug 24, 2020 at 5:46 PM John Roesler  wrote:
> >
> > > Hi Steve,
> > >
> > > Which version of Streams is this? I vaguely recall that we updated to a
> > > version of Rocks that’s compiled for ARM, and I think some people have
> > used
> > > it on ARM, but I might be misremembering.
> > >
> > > I’m afraid I can’t be much help in debugging this, but maybe some others
> > > on the list have more context. If all else fails, you can probably narrow
> > > it down to the Java RocksDB library. If you create a standalone java
> > > program using the same rocks dependency that we do, then you can pretty
> > > confidently raise it with the Rocks folks.
> > >
> > > If you want to side-step the issue while debugging this, plugging in a
> > > different store implementation (like the in-memory one) would probably
> > > work, although I’m sure memory is scarce on a Raspberry Pi.
> > >
> > > I hope this helps,
> > > -John
> > >
> > > On Mon, Aug 24, 2020, at 19:26, Steve Jones wrote:
> > > > I'm trying to install Kafka Streams on a Raspberry PI, it works fine
> > as a
> > > > broker, works fine as both a producer and consumer, but when I try and
> > > run
> > > > streams on the PI rather than on the Mac there is a linker issue:
> > > >
> > > > Exception in thread
> > > > "main-broker-f53264a1-0c70-445f-bf3f-bf634a9a1ed2-StreamThread-1"
> > > > java.lang.UnsatisfiedLinkError:
> > > /tmp/librocksdbjni15158764823832728522.so:
> > > > /tmp/librocksdbjni15158764823832728522.so: cannot open shared object
> > > file:
> > > > No such file or directory (Possible cause: can't load IA 32 .so on a
> > ARM
> > > > platform)
> > > >
> > > > at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
> > > >
> > > > at
> > > java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2452)
> > > >
> > > > at
> > > >
> > >
> > java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2508)
> > > >
> > > > at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2704)
> > > >
> > > > at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2637)
> > > >
> > > >
> > > > I've recompiled rocksdb on the RaspberryPI and added that to the
> > loadpath
> > > > but the same error occurs.  I've done the Google search but not found
> > > > anything around streams on ARM (Raspberry PI) and what needs to be
> > > > recompiled/linked for this to work.
> > > >
> > > >
> > > > Help appreciated.
> > > >
> > > >
> > > > Steve Jones
> > > >
> > >
> >
>


Re: JNI linker issue on ARM (Raspberry PI)

2020-08-24 Thread John Roesler
Hi Steve,

Which version of Streams is this? I vaguely recall that we updated to a version 
of Rocks that’s compiled for ARM, and I think some people have used it on ARM, 
but I might be misremembering. 

I’m afraid I can’t be much help in debugging this, but maybe some others on the 
list have more context. If all else fails, you can probably narrow it down to 
the Java RocksDB library. If you create a standalone java program using the 
same rocks dependency that we do, then you can pretty confidently raise it with 
the Rocks folks.

If you want to side-step the issue while debugging this, plugging in a 
different store implementation (like the in-memory one) would probably work, 
although I’m sure memory is scarce on a Raspberry Pi. 

I hope this helps,
-John

On Mon, Aug 24, 2020, at 19:26, Steve Jones wrote:
> I'm trying to install Kafka Streams on a Raspberry PI, it works fine as a
> broker, works fine as both a producer and consumer, but when I try and run
> streams on the PI rather than on the Mac there is a linker issue:
> 
> Exception in thread
> "main-broker-f53264a1-0c70-445f-bf3f-bf634a9a1ed2-StreamThread-1"
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni15158764823832728522.so:
> /tmp/librocksdbjni15158764823832728522.so: cannot open shared object file:
> No such file or directory (Possible cause: can't load IA 32 .so on a ARM
> platform)
> 
> at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
> 
> at java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2452)
> 
> at
> java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2508)
> 
> at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2704)
> 
> at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2637)
> 
> 
> I've recompiled rocksdb on the RaspberryPI and added that to the loadpath
> but the same error occurs.  I've done the Google search but not found
> anything around streams on ARM (Raspberry PI) and what needs to be
> recompiled/linked for this to work.
> 
> 
> Help appreciated.
> 
> 
> Steve Jones
>


Re: Request to add me to the contributors list

2020-08-17 Thread John Roesler
Hello Sanjay,

I've just added you to the contributor list in Jira, so you
should be able to assign tickets now.

Thanks for your interest in the project!

-John

On Sun, 2020-08-16 at 21:27 -0500, Sanjay Y R wrote:
> Hello,
> 
> I am Sanjay. I am a Kafka user intending to contribute to Kafka Open Source
> codebase. Right now, I am going through the Kafka documentation and looking
> to fix any issues I find. Could you please add me to the Kafka
> contributors list so that I can assign the issue to myself? My details are
> as follows:
> 
> Name: Sanjay Yellambalase Ravikumar
> JIRA Username: *sanjayyr*
> Email: sanjayyr...@gmail.com
> 
> Looking forward to hearing from you.
> 
> Thank you,
> Sanjay



Re: Documentation suggestion / issue and question about ticket openning

2020-08-12 Thread John Roesler
Hello Ahmed,

Thanks for this feedback. I can see what you mean.

I know that there is a redesign currently in progress for
the site, but I'm not sure if the API/Config documentation
is planned as part of that effort. Here's the PR to re-
design the home page: 
https://github.com/apache/kafka-site/pull/269

I think it would be best to open a Jira ticket at: 
https://issues.apache.org/jira/projects/KAFKA/issues

Thanks,
-John

On Wed, 2020-08-12 at 12:59 +, Ahmed Al-Saghir wrote:
> Dears kafka team,
> Kindly not that Kafka documentation navigation is pretty hard 
> to the eyes and exhausting. Once I'm on section or reading configuration, I 
> can't know what section I'm currently looking at or under what category. This 
> is very annoying and very hard. For example
> 
> https://kafka.apache.org/documentation/#offset.flush.timeout.ms
> 
> Once you open that URL, would you able to tell me exactly what navigation 
> steps to follow to access offset.flush.timeout.ms directly ? What section 
> this configuration is under ? Consumer, Producer or Broker ? Please allow 
> easier documentation navigation for sake of easiness and usability of users.
> 
> I'm not sure if this is categorized as suggestion or issue so how or what to 
> do and to open ticket with it. Sorry for my English. Please guide me if there 
> is steps to take to report this.
> 
> Thank you
> 
> Ahmed Al-Saghir



[ANNOUNCE] Apache Kafka 2.5.1

2020-08-11 Thread John Roesler
The Apache Kafka community is pleased to announce the
release for Apache Kafka 2.5.1

This is a bug fix release, and it includes fixes and
improvements for 72 issues, including some critical bugs.

All of the changes in this release can be found in the
release notes:
https://www.apache.org/dist/kafka/2.5.1/RELEASE_NOTES.html


You can download the source and binary release (Scala 2.12
and 2.13) from:
https://kafka.apache.org/downloads#2.5.1


---


Apache Kafka is a distributed streaming platform with four
core APIs:


** The Producer API allows an application to publish a
stream records to one or more Kafka topics.

** The Consumer API allows an application to subscribe to
one or more topics and process the stream of records
produced to them.

** The Streams API allows an application to act as a stream
processor, consuming an input stream from one or more topics
and producing an output stream to one or more output topics,
effectively transforming the input streams to output
streams.

** The Connector API allows building and running reusable
producers or consumers that connect Kafka topics to existing
applications or data systems. For example, a connector to a
relational database might capture every change to a table.


With these APIs, Kafka can be used for two broad classes of
application:

** Building real-time streaming data pipelines that reliably
get data between systems or applications.

** Building real-time streaming applications that transform
or react to the streams of data.


Apache Kafka is in use at large and small companies
worldwide, including Capital One, Goldman Sachs, ING,
LinkedIn, Netflix, Pinterest, Rabobank, Target, The New York
Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 42 contributors to this
release!

Adam Bellemare, Andras Katona, Andy Coates, Anna Povzner, A.
Sophie Blee-Goldman, Auston, belugabehr, Bill Bejeck, Boyang
Chen, Bruno Cadonna, Chia-Ping Tsai, Chris Egerton, David
Arthur, David Jacot, Dezhi “Andy” Fang, Dima Reznik, Ego,
Evelyn Bayes, Ewen Cheslack-Postava, Greg Harris, Guozhang
Wang, Ismael Juma, Jason Gustafson, Jeff Widman, Jeremy
Custenborder, jiameixie, John Roesler, Jorge Esteban
Quilcate Otoya, Konstantine Karantasis, Lucent-Wong, Mario
Molina, Matthias J. Sax, Navinder Pal Singh Brar, Nikolay,
Rajini Sivaram, Randall Hauch, Sanjana Kaundinya, showuon,
Steve Rodrigues, Tom Bentley, Tu V. Tran, vinoth chandar

We welcome your help and feedback. For more information on
how to report problems, and to get involved, visit the
project website at https://kafka.apache.org/

Thank you!


Regards,

John Roesler



Re: join not working in relation to how StreamsBuilder builds the topology

2020-08-11 Thread John Roesler
Hi Mathieu,

Aaah, that is a bummer. The littlest things can be
the hardest to find. Well, I'm glad I was able to
help in some capacity.

Cheers,
-John

On Tue, 2020-08-11 at 16:02 +0200, Mathieu D
wrote:
> Hi John,
> 
> Thanks for your answer.
> 
> Reading your suggestions forced me to reconsider one more time the
> partitioner set on inputs. I already challenged all of them .. except one
> input that was provided in my test infrastructure by kafkacat.
> And it appears kafkacat is not using by default the same partitioner as we
> have in kafka-streams world. Bummer.
> (aaand I spent entire days on it)
> 
> Problem solved
> Thanks
> 
> Mathieu
> 
> Le mar. 11 août 2020 à 07:18, John Roesler  a écrit :
> 
> > Hi Mathieu,
> > 
> > That sounds frustrating. I’m sorry for the trouble.
> > 
> > From what you described, it does sound like something wacky is going on
> > with the partitioning. In particular, the fact that both joins work when
> > you set everything to 1 partition.
> > 
> > You mentioned that you’re using the default partitioner everywhere. Can
> > you confirm whether all the source topics and all the repartition topics
> > also have the same number of partitions (when you’re not forcing them to 1,
> > of course)?
> > 
> > Are the transformers changing the keys? If they are not, then you can use
> > transformValue to avoid the repartition. If they are, then the repartition
> > topics are indeed necessary. Streams should ensure that the repartition
> > topics get the same number of partitions as the topic you’re joining with.
> > 
> > As you mentioned, I can only speculate without seeing the code. I think my
> > next step would be to find a specific null join output that you think
> > should have been non-null and trace the key back through the topology. You
> > should be able to locate the key in each of the topics and state stores to
> > verify that it’s in the right partition all the way through.
> > 
> > You could also experiment with the trace logs, but they are super verbose.
> > Or you could try running the app in an IDE and setting breakpoints to
> > figure out what is happening each step of the way.
> > 
> > The funny thing about a leftJoin in particular is that it would only be
> > null if you’re getting records from the right, but none from the left. Any
> > record from the left would instead produce a K:(LeftVal, null) result. It
> > seems like even if the repartition is somehow going to the wrong partition,
> > you should see the (left, null) result at some point. I’m struggling to
> > think why you would only see null results.
> > 
> > 
> > I hope this helps!
> > -John
> > 
> > 
> > On Mon, Aug 10, 2020, at 09:44, Mathieu D wrote:
> > > Dear community,
> > > 
> > > I have a quite tough problem I struggle to diagnose and fix.
> > > I'm not sure if it's a bug in Kafka-streams or some subtlety I didn't get
> > > in using the DSL api.
> > > 
> > > The problem is the following.
> > > We have a quite elaborate stream app, working well, in production. We'd
> > > like to add a left join with a KTable (data is coming from a DB via kafka
> > > connect jdbc source).
> > > So we end-up with a topology like this:
> > > 
> > > Event Source  (some transformations and joins) - leftJoin(A:
> > > KTable) - leftJoin(B: KTable)  sinks
> > > 
> > > The new leftjoin is the one joining A.
> > > The transformations are several custom Transformers.
> > > 
> > > In tests with TopologyTestDriver, all is good, we can validate the
> > general
> > > logic.
> > > In integration tests with a real Kafka (in a docker, though), we can't
> > > manage to have both left joins work at the same time !
> > > The leftJoin with `A` always return null.
> > > 
> > > I ran dozen of tests, tweaking and fiddling everything, and I found out
> > > that it's related to partitioning. If I force the number of partitions
> > down
> > > to 1 (by setting all input topics to 1 partition), the join works.
> > > In one of the tests, I suspected one of the transformations, so I removed
> > > it. The topology shown by describe() changed quite significantly (going
> > > from 2 subtoplogies to 1), and this made the leftJoin with A work...and
> > the
> > > leftJoin with B fail.
> > > 
> > > It drives me crazy.
> > > 
> > > Activating the optimization didn't help.
> > > The input topics for KTables A a

Re: join not working in relation to how StreamsBuilder builds the topology

2020-08-10 Thread John Roesler
Hi Mathieu,

That sounds frustrating. I’m sorry for the trouble.

>From what you described, it does sound like something wacky is going on with 
>the partitioning. In particular, the fact that both joins work when you set 
>everything to 1 partition. 

You mentioned that you’re using the default partitioner everywhere. Can you 
confirm whether all the source topics and all the repartition topics also have 
the same number of partitions (when you’re not forcing them to 1, of course)?

Are the transformers changing the keys? If they are not, then you can use 
transformValue to avoid the repartition. If they are, then the repartition 
topics are indeed necessary. Streams should ensure that the repartition topics 
get the same number of partitions as the topic you’re joining with.

As you mentioned, I can only speculate without seeing the code. I think my next 
step would be to find a specific null join output that you think should have 
been non-null and trace the key back through the topology. You should be able 
to locate the key in each of the topics and state stores to verify that it’s in 
the right partition all the way through. 

You could also experiment with the trace logs, but they are super verbose. Or 
you could try running the app in an IDE and setting breakpoints to figure out 
what is happening each step of the way.

The funny thing about a leftJoin in particular is that it would only be null if 
you’re getting records from the right, but none from the left. Any record from 
the left would instead produce a K:(LeftVal, null) result. It seems like even 
if the repartition is somehow going to the wrong partition, you should see the 
(left, null) result at some point. I’m struggling to think why you would only 
see null results. 


I hope this helps!
-John


On Mon, Aug 10, 2020, at 09:44, Mathieu D wrote:
> Dear community,
> 
> I have a quite tough problem I struggle to diagnose and fix.
> I'm not sure if it's a bug in Kafka-streams or some subtlety I didn't get
> in using the DSL api.
> 
> The problem is the following.
> We have a quite elaborate stream app, working well, in production. We'd
> like to add a left join with a KTable (data is coming from a DB via kafka
> connect jdbc source).
> So we end-up with a topology like this:
> 
> Event Source  (some transformations and joins) - leftJoin(A:
> KTable) - leftJoin(B: KTable)  sinks
> 
> The new leftjoin is the one joining A.
> The transformations are several custom Transformers.
> 
> In tests with TopologyTestDriver, all is good, we can validate the general
> logic.
> In integration tests with a real Kafka (in a docker, though), we can't
> manage to have both left joins work at the same time !
> The leftJoin with `A` always return null.
> 
> I ran dozen of tests, tweaking and fiddling everything, and I found out
> that it's related to partitioning. If I force the number of partitions down
> to 1 (by setting all input topics to 1 partition), the join works.
> In one of the tests, I suspected one of the transformations, so I removed
> it. The topology shown by describe() changed quite significantly (going
> from 2 subtoplogies to 1), and this made the leftJoin with A work...and the
> leftJoin with B fail.
> 
> It drives me crazy.
> 
> Activating the optimization didn't help.
> The input topics for KTables A and B are read with a TimestamExtractor to
> 0, since this is static data, to make sure we don't run into timestamp
> ordering issues.
> We double-checked and tripled-checked the keys in various stages, and we're
> sure they're good (by the way, it works with 1 partition).
> Partitioner is always the default everywhere (in inputs, kafka-connect...),
> actually we never touch this.
> 
> Actually it seems related to repartitioning placed in the topology by
> StreamsBuilder (probably in relation to transformers ?)
> 
> So, I imagine you can't help much without seeing the code, but if you think
> of anything that could help diagnosing this further, please tell.
> 
> Mathieu
>


[RESULTS] [VOTE] Release Kafka version 2.5.1

2020-08-04 Thread John Roesler
Hello all,

This vote passes with four +1 votes (3 binding) and no 0 or -1 votes.

+1 votes
PMC Members (in voting order):
* Ismael Juma
* Manikumar Reddy
* Mickael Maison 

Committers (in voting order):
* John Roesler

Community:
* No votes

0 votes
* No votes

-1 votes
* No votes

Vote thread:
https://lists.apache.org/x/thread.html/r31663dd65bce69eebc1bba73914e3cb6d4ae33d3e4bc473af211c4a6@%3Cusers.kafka.apache.org%3E

I'll continue with the release process and the release announcement will
follow early next week. 

Thanks,
John Roesler


Re: [VOTE] 2.5.1 RC0

2020-07-30 Thread John Roesler
Hello again all,

Just a reminder that the 2.5.1 RC0 is available for verification.

Thanks,
John

On Thu, Jul 23, 2020, at 21:39, John Roesler wrote:
> Hello Kafka users, developers and client-developers,
> 
> This is the first candidate for release of Apache Kafka 2.5.1.
> 
> Apache Kafka 2.5.1 is a bugfix release and fixes 72 issues since the 
> 2.5.0 release. Please see the release notes for more information.
> 
> Release notes for the 2.5.1 release:
> https://home.apache.org/~vvcephei/kafka-2.5.1-rc0/RELEASE_NOTES.html
> 
> *** Please download, test and vote by Tuesday, 28 July 2020, 5pm Pacific
> 
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
> 
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~vvcephei/kafka-2.5.1-rc0/
> 
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
> 
> * Javadoc:
> https://home.apache.org/~vvcephei/kafka-2.5.1-rc0/javadoc/
> 
> * Tag to be voted upon (off 2.5 branch) is the 2.5.1 tag:
> https://github.com/apache/kafka/releases/tag/2.5.1-rc0
> 
> * Documentation:
> https://kafka.apache.org/25/documentation.html
> 
> * Protocol:
> https://kafka.apache.org/25/protocol.html
> 
> * Successful Jenkins builds for the 2.5 branch:
> Unit/integration tests: 
> https://builds.apache.org/job/kafka-2.5-jdk8/170/
> System tests: 
> https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4066
> System test results: 
> http://confluent-kafka-2-5-system-test-results.s3-us-west-2.amazonaws.com/2020-07-22--001.1595474441--apache--2.5--7f9187fe3/report.html
> 
> Thanks,
> John
>


[VOTE] 2.5.1 RC0

2020-07-23 Thread John Roesler
Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.5.1.

Apache Kafka 2.5.1 is a bugfix release and fixes 72 issues since the 2.5.0 
release. Please see the release notes for more information.

Release notes for the 2.5.1 release:
https://home.apache.org/~vvcephei/kafka-2.5.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Tuesday, 28 July 2020, 5pm Pacific

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~vvcephei/kafka-2.5.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~vvcephei/kafka-2.5.1-rc0/javadoc/

* Tag to be voted upon (off 2.5 branch) is the 2.5.1 tag:
https://github.com/apache/kafka/releases/tag/2.5.1-rc0

* Documentation:
https://kafka.apache.org/25/documentation.html

* Protocol:
https://kafka.apache.org/25/protocol.html

* Successful Jenkins builds for the 2.5 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-2.5-jdk8/170/
System tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4066
System test results: 
http://confluent-kafka-2-5-system-test-results.s3-us-west-2.amazonaws.com/2020-07-22--001.1595474441--apache--2.5--7f9187fe3/report.html

Thanks,
John


Re: Confluent Platform- KTable clarification

2020-07-22 Thread John Roesler
Hello Nag,

Yes, your conclusion sounds right.

“Sum the values per key” is a statement that doesn’t really make sense in a 
KTable context, since there is always just one value per key (the latest 
update).

I think the docs are just trying to drive the point home that in a KTable, 
there is just one value per key, whereas in a KStream, each key has a sequence 
of values. 

Thanks,
John

On Wed, Jul 22, 2020, at 10:22, Nag Y wrote:
> I understood A KStream is an abstraction of a record stream and A KTable is
> an abstraction of a changelog stream ( updates or inserts) and the
> semantics around it.
> 
> However, this is where some confusion arises .. From confluent documentation
> 
> 
> To illustrate, let’s imagine the following two data records are being sent
> to the stream:
> 
> ("alice", 1) --> ("alice", 3)
> 
> *If your stream processing application were to sum the values per user*, it
> would return 3 for alice. Why? Because the second data record would be
> considered an update of the previous record. Compare this behavior of
> KTable with the illustration for KStream above, which would return 4 for
> alice.
> 
> Coming to the highlighted area , *if we were to sum the values* , it should
> be 4 . right ? However, *if we were to look at the "updated" view of the
> logs* , yes , it is 3 as KTable maintains either updates or inserts . Did I
> get it right ?
>


Re: [ANNOUNCE] New committer: Boyang Chen

2020-06-22 Thread John Roesler
That’s great news! Congratulations, Boyang. It’s well deserved.
-John

On Mon, Jun 22, 2020, at 18:26, Guozhang Wang wrote:
> The PMC for Apache Kafka has invited Boyang Chen as a committer and we are
> pleased to announce that he has accepted!
> 
> Boyang has been active in the Kafka community more than two years ago.
> Since then he has presented his experience operating with Kafka Streams at
> Pinterest as well as several feature development including rebalance
> improvements (KIP-345) and exactly-once scalability improvements (KIP-447)
> in various Kafka Summit and Kafka Meetups. More recently he's also been
> participating in Kafka broker development including post-Zookeeper
> controller design (KIP-500). Besides all the code contributions, Boyang has
> also helped reviewing even more PRs and KIPs than his own.
> 
> Thanks for all the contributions Boyang! And look forward to more
> collaborations with you on Apache Kafka.
> 
> 
> -- Guozhang, on behalf of the Apache Kafka PMC
>


Re: Clients may fetch incomplete set of topic partitions during cluster startup

2020-05-29 Thread John Roesler
Hello,

Thanks for the question. It looks like the ticket is still open,
so I think it's safe to say it's not fixed.

If you're affected by the issue, it would be helpful to leave
a comment on the ticket to that effect.

Thanks,
-John

On Fri, May 29, 2020, at 00:05, Debraj Manna wrote:
> Anyone any update on my below query?
> 
> On Thu, 28 May 2020, 15:45 Debraj Manna,  wrote:
> 
> > Hi
> >
> > Is the below issue fixed in latest Kafka 2.5?
> >
> > https://issues.apache.org/jira/browse/KAFKA-8480
> >
> > I am seeing this issue still open. So just confirming before upgrading
> > Kafka to the latest.
> >
> > Thanks,
> >
> >
>


Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

2020-05-28 Thread John Roesler
Woah, that's a nasty bug. I've just pinged the Jira ticket. Please feel free to
do the same.

Thanks,
-John

On Thu, May 28, 2020, at 02:55, Pushkar Deole wrote:
> Thanks for the help Guozhang!
> however i realized that the exception and actual problem is totally
> different. The problem was the client was not set with SSL truststore while
> server is SSLenabled.
> I also found this open bug on kafka
> https://issues.apache.org/jira/browse/KAFKA-4493
> After setting the SSL properties on stream, I am able to get it up and
> running.
> 
> @kafka developers, I think the problem is very misleading and should be
> fixed as soon as possible, or a proper exception should be thrown.
> 
> On Thu, May 28, 2020 at 9:46 AM Guozhang Wang  wrote:
> 
> > Hello Pushkar,
> >
> > I think the memory pressure may not come from the topic data consumption,
> > but from rocksDB used for materializing the global table. Note rocksDB
> > allocates large chunk of memory beforehand in mem-table / page cache /
> > reader cache with default configs. You can get some detailed information
> > from this KIP:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
> >
> >
> > Guozhang
> >
> >
> > On Wed, May 27, 2020 at 8:44 PM Pushkar Deole 
> > wrote:
> >
> > > Hello All,
> > >
> > > I am using Stream DSL API just to create a GlobalKTable backed by a
> > topic.
> > > The topology is simple, just create a global table from a topic and
> > that's
> > > it (pasted below code snippet), when I run this service on K8S cluster
> > > (container in a pod), the service gets OutOfMemoryError during
> > > kafkaStreams.start() method call (exception trace pasted below). Note
> > that
> > > the topic is newly created so there is no data in the topic. POD memory
> > was
> > > set initially to 500MiB which I doubled to 1000MiB but no luck.
> > > kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
> > > version ahead I think 2.4 but that should not be an issue. Any help would
> > > be appreciated since I am blocked at this point.
> > >
> > > Properties props = new Properties();
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
> > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> > > StreamsBuilder streamsBuilder = new StreamsBuilder();
> > > GlobalKTable> groupCacheTable =
> > > streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> > > Materialized.as(GROUP_CACHE_STORE_NAME));
> > > Topology groupCacheTopology = streamsBuilder.build();
> > > kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> > > kafkaStreams.start();
> > >
> > > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > > LOG.info("Stopping the stream");
> > > kafkaStreams.close();
> > > }));
> > >
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> > > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition
> > from
> > > CREATED to
> > >
> > >
> > REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":2}
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> > > exception in thread 'kafka-admin-client-thread |
> > >
> > >
> > DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> > > |
> > >
> > >
> > DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":4,"stack_trace":"java.lang.OutOfMemoryError:
> > > Java heap space\n\tat java.base/java.nio.HeapByteBuffer.(Unknown
> > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > Source)\n\tat
> > >
> > >
> > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> > >
> > >
> > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> > > java.base/java.lang.Thread.run(Unknown Source)\n"}
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> > > exception in thread 'kafka-producer-network-thread |
> > >
> > >
> > 

Re: Kafka Streams, WindowBy, Grace Period, Late events, Suppres operation

2020-05-11 Thread John Roesler
Hello Baki,

It looks like option 2 is really what you want. The purpose of the time window 
stores is to allow deleting old data when you need to group by a time 
dimension, which naturally results in an infinite key space. 

If you don’t want to wait for the final result, can you just not add the 
suppression? It’s only purpose is to _not_ emit any data until _after_ the 
grace period expires. Without it, streams will still respect the grace period 
by updating the result whenever there is late arriving data.

Lastly, that is a check for overflow. The timestamp is supposed to be a 
timestamp in milliseconds since the epoch. If you’re getting an overflow, it 
means your time stamps are from the far future. You might want to manually 
inspect them. 

I hope this helps,
John


On Sun, May 10, 2020, at 05:29, Baki Hayat wrote:
> Hello Friends,
> 
> I wrote into stackoverflow but also i am writing here,
> 
> I have couple of questions about window operation, grace period and late
> events.
> 
> Could you please check my problem about group by with adding time field as
> a key or window by and group by without time field ?
> 
> Here is detail explanation...
> 
> https://stackoverflow.com/questions/61680407/kafka-streams-groupby-late-event-persistentwindowstore-windowby-with-gra
>


Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread John Roesler
Oh, my mistake. I thought this was a different thread :)

You might want to check, but I don’t think there is a kip for a map serde. Of 
course, you’re welcome to start one. 

Thanks,
John

On Mon, May 11, 2020, at 09:14, John Roesler wrote:
> Hi Pushkar,
> 
> I don’t think there is. You’re welcome to start one if you think it 
> would be a useful addition.
> 
> Before worrying about it further, though, you might want to check the 
> InMemoryKeyValueStore implementation, since my answer was from memory. 
> 
> Thanks,
> John
> 
> On Mon, May 11, 2020, at 03:47, Pushkar Deole wrote:
> > John,
> > is there KIP in progress for supporting Java HashMap also?
> > 
> > On Sun, May 10, 2020, 00:47 John Roesler  wrote:
> > 
> > > Yes, that’s correct. It’s only for serializing the java type ‘byte[]’.
> > >
> > > On Thu, May 7, 2020, at 10:37, Pushkar Deole wrote:
> > > > Thanks John... I got to finish the work in few days so need to get it
> > > > quick, so looking for something ready. I will take a look at jackson
> > > json.
> > > >
> > > > By the way, what is the byteArrayserializer? As the name suggests, it is
> > > > for byte arrays so won't work for java ArrayList, right?
> > > >
> > > > On Thu, May 7, 2020 at 8:44 PM John Roesler  wrote:
> > > >
> > > > > Hi Pushkar,
> > > > >
> > > > > If you’re not too concerned about compactness, I think Jackson json
> > > > > serialization is the easiest way to serialize complex types.
> > > > >
> > > > > There’s also a kip in progress to add a list serde. You might take a
> > > look
> > > > > at that proposal for ideas to write your own.
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > > On Thu, May 7, 2020, at 08:17, Nicolas Carlot wrote:
> > > > > > Won't say it's a good idea to use java serialized classes for
> > > messages,
> > > > > but
> > > > > > you should use a byteArraySerializer if you want to do such things
> > > > > >
> > > > > > Le jeu. 7 mai 2020 à 14:32, Pushkar Deole  a
> > > > > écrit :
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I have a requirement to store a record with key as java String and
> > > > > value as
> > > > > > > java's ArrayList in the kafka topic. Kafka has by default provided
> > > a
> > > > > > > StringSerializer and StringDeserializer, however for java
> > > ArrayList,
> > > > > how
> > > > > > > can get serializer. Do I need to write my own? Can someone share 
> > > > > > > if
> > > > > someone
> > > > > > > already has written one?
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > *Nicolas Carlot*
> > > > > >
> > > > > > Lead dev
> > > > > > |  | nicolas.car...@chronopost.fr
> > > > > >
> > > > > >
> > > > > > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage.
> > > La
> > > > > > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> > > > > >
> > > > > > [image: Logo Chronopost]
> > > > > > | chronopost.fr <http://www.chronopost.fr/>
> > > > > > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> > > > > Twitter
> > > > > > <https://twitter.com/chronopost>.
> > > > > >
> > > > > > [image: DPD Group]
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread John Roesler
Hi Pushkar,

I don’t think there is. You’re welcome to start one if you think it would be a 
useful addition.

Before worrying about it further, though, you might want to check the 
InMemoryKeyValueStore implementation, since my answer was from memory. 

Thanks,
John

On Mon, May 11, 2020, at 03:47, Pushkar Deole wrote:
> John,
> is there KIP in progress for supporting Java HashMap also?
> 
> On Sun, May 10, 2020, 00:47 John Roesler  wrote:
> 
> > Yes, that’s correct. It’s only for serializing the java type ‘byte[]’.
> >
> > On Thu, May 7, 2020, at 10:37, Pushkar Deole wrote:
> > > Thanks John... I got to finish the work in few days so need to get it
> > > quick, so looking for something ready. I will take a look at jackson
> > json.
> > >
> > > By the way, what is the byteArrayserializer? As the name suggests, it is
> > > for byte arrays so won't work for java ArrayList, right?
> > >
> > > On Thu, May 7, 2020 at 8:44 PM John Roesler  wrote:
> > >
> > > > Hi Pushkar,
> > > >
> > > > If you’re not too concerned about compactness, I think Jackson json
> > > > serialization is the easiest way to serialize complex types.
> > > >
> > > > There’s also a kip in progress to add a list serde. You might take a
> > look
> > > > at that proposal for ideas to write your own.
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > > On Thu, May 7, 2020, at 08:17, Nicolas Carlot wrote:
> > > > > Won't say it's a good idea to use java serialized classes for
> > messages,
> > > > but
> > > > > you should use a byteArraySerializer if you want to do such things
> > > > >
> > > > > Le jeu. 7 mai 2020 à 14:32, Pushkar Deole  a
> > > > écrit :
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I have a requirement to store a record with key as java String and
> > > > value as
> > > > > > java's ArrayList in the kafka topic. Kafka has by default provided
> > a
> > > > > > StringSerializer and StringDeserializer, however for java
> > ArrayList,
> > > > how
> > > > > > can get serializer. Do I need to write my own? Can someone share if
> > > > someone
> > > > > > already has written one?
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Nicolas Carlot*
> > > > >
> > > > > Lead dev
> > > > > |  | nicolas.car...@chronopost.fr
> > > > >
> > > > >
> > > > > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage.
> > La
> > > > > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> > > > >
> > > > > [image: Logo Chronopost]
> > > > > | chronopost.fr <http://www.chronopost.fr/>
> > > > > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> > > > Twitter
> > > > > <https://twitter.com/chronopost>.
> > > > >
> > > > > [image: DPD Group]
> > > > >
> > > >
> > >
> >
>


Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-11 Thread John Roesler
Hi Pushkar,

I’m glad you’ve been able to work through the issues.

The GlobalKTable does store the data in memory (or on disk, depending how you 
configure it). I think the in-memory version uses a TreeMap, which is 
logarithmic time access. I think you’ll find it sufficiently fast regardless. 

Thanks,
John

On Mon, May 11, 2020, at 06:51, Pushkar Deole wrote:
> John,
> 
> I think I can get the cache structure modified to make use of GlobalKTable
> here so the data can be shared across. I could get information that the
> admin data will be uploaded well in advance before main events so the issue
> with 'missed joins' won't exists since by the time main events start
> flowing, the admin data has been synchronized to all the service instances.
> 
> By the way, I would like to ask you how GlobalKTable works internally, does
> it store all data in memory or it gets it from the backed topic everytime?
> Secondly, what kind of internal data structure does it use? Is it good for
> constant time performance?
> 
> On Thu, May 7, 2020 at 7:27 PM John Roesler  wrote:
> 
> > Hi Pushkar,
> >
> > To answer your question about tuning the global store latency, I think the
> > biggest impact thing you can do is to configure the consumer that loads the
> > data for global stores. You can pass configs specifically to the global
> > consumer with the prefix: “ global.consumer.”
> >
> > Regarding the larger situation, it seems like the global table and a
> > distributed cache would display the same basic behavior in terms of the
> > potential for missed joins. Then, it probably makes sense to go for the
> > option with fewer components to implement and maintain, which to me points
> > to the global KTable.
> >
> > Since you can anticipate that missed joins can be a problem, you can build
> > in some metrics and reporting for how many misses you actually observe, and
> > potentially redesign the app if it’s actually a problem.
> >
> > I hope this helps!
> > -John
> >
> > On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> > > Thanks John... appreciate your inputs and suggestions. I have been
> > assigned
> > > recently to this task (of persisting the cache) and haven't been involved
> > > in original design and architecture and agree with all the issues you
> > have
> > > highlighted.
> > > However, at this point, i don't think the application can be converted to
> > > streams since the design is not flexible and it would require lot of
> > > rewrite of code plus subsequent testing.
> > >
> > > My first thought was to use external database only,  preferably the
> > > distributed caching systems like Apache Ignite since it will have least
> > > impact on performance. Going to database for every event would impact the
> > > throughput a lot. Probably having distributed caching (key/value pairs)
> > > would have comparatively lesser impact.
> > > Second choice is to go for GlobalKTable however this needs to be done
> > very
> > > carefully.
> > >
> > > Thanks again!
> > >
> > > On Mon, May 4, 2020 at 11:18 PM Pushkar Deole 
> > wrote:
> > >
> > > > Thanks John... what parameters would affect the latency in case
> > > > GlobalKTable will be used and is there any configurations that could be
> > > > tuned to minimize the latency of sync with input topic?
> > > >
> > > > On Mon, May 4, 2020 at 10:20 PM John Roesler 
> > wrote:
> > > >
> > > >> Hello Pushkar,
> > > >>
> > > >> Yes, that’s correct. The operation you describe is currently not
> > > >> supported. If you want to keep the structure you described in place,
> > I’d
> > > >> suggest using an external database for the admin objects. I’ll give
> > another
> > > >> idea below.
> > > >>
> > > >> With your current architecture, I’m a little concerned about data
> > races.
> > > >> From what I saw, nothing would prevent processing stream records with
> > agent
> > > >> 10 before you process the admin record with agent 10. This problem
> > will
> > > >> persist no matter where you locate the cache.
> > > >>
> > > >> GlobalKTable would no doubt make it worse, since it increases the
> > latency
> > > >> before admin record 10 is queriable everywhere.
> > > >>
> > > >> I think you’ll want to make a call between architecture simplicity
> &

Re: records with key as string and value as java ArrayList in topic

2020-05-09 Thread John Roesler
Yes, that’s correct. It’s only for serializing the java type ‘byte[]’.

On Thu, May 7, 2020, at 10:37, Pushkar Deole wrote:
> Thanks John... I got to finish the work in few days so need to get it
> quick, so looking for something ready. I will take a look at jackson json.
> 
> By the way, what is the byteArrayserializer? As the name suggests, it is
> for byte arrays so won't work for java ArrayList, right?
> 
> On Thu, May 7, 2020 at 8:44 PM John Roesler  wrote:
> 
> > Hi Pushkar,
> >
> > If you’re not too concerned about compactness, I think Jackson json
> > serialization is the easiest way to serialize complex types.
> >
> > There’s also a kip in progress to add a list serde. You might take a look
> > at that proposal for ideas to write your own.
> >
> > Thanks,
> > John
> >
> > On Thu, May 7, 2020, at 08:17, Nicolas Carlot wrote:
> > > Won't say it's a good idea to use java serialized classes for messages,
> > but
> > > you should use a byteArraySerializer if you want to do such things
> > >
> > > Le jeu. 7 mai 2020 à 14:32, Pushkar Deole  a
> > écrit :
> > >
> > > > Hi All,
> > > >
> > > > I have a requirement to store a record with key as java String and
> > value as
> > > > java's ArrayList in the kafka topic. Kafka has by default provided a
> > > > StringSerializer and StringDeserializer, however for java ArrayList,
> > how
> > > > can get serializer. Do I need to write my own? Can someone share if
> > someone
> > > > already has written one?
> > > >
> > >
> > >
> > > --
> > > *Nicolas Carlot*
> > >
> > > Lead dev
> > > |  | nicolas.car...@chronopost.fr
> > >
> > >
> > > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
> > > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> > >
> > > [image: Logo Chronopost]
> > > | chronopost.fr <http://www.chronopost.fr/>
> > > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> > Twitter
> > > <https://twitter.com/chronopost>.
> > >
> > > [image: DPD Group]
> > >
> >
>


Re: records with key as string and value as java ArrayList in topic

2020-05-07 Thread John Roesler
Hi Pushkar,

If you’re not too concerned about compactness, I think Jackson json 
serialization is the easiest way to serialize complex types. 

There’s also a kip in progress to add a list serde. You might take a look at 
that proposal for ideas to write your own. 

Thanks,
John

On Thu, May 7, 2020, at 08:17, Nicolas Carlot wrote:
> Won't say it's a good idea to use java serialized classes for messages, but
> you should use a byteArraySerializer if you want to do such things
> 
> Le jeu. 7 mai 2020 à 14:32, Pushkar Deole  a écrit :
> 
> > Hi All,
> >
> > I have a requirement to store a record with key as java String and value as
> > java's ArrayList in the kafka topic. Kafka has by default provided a
> > StringSerializer and StringDeserializer, however for java ArrayList, how
> > can get serializer. Do I need to write my own? Can someone share if someone
> > already has written one?
> >
> 
> 
> -- 
> *Nicolas Carlot*
> 
> Lead dev
> |  | nicolas.car...@chronopost.fr
> 
> 
> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> 
> [image: Logo Chronopost]
> | chronopost.fr 
> Suivez nous sur Facebook  et Twitter
> .
> 
> [image: DPD Group]
>


Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-07 Thread John Roesler
Hi Pushkar,

To answer your question about tuning the global store latency, I think the 
biggest impact thing you can do is to configure the consumer that loads the 
data for global stores. You can pass configs specifically to the global 
consumer with the prefix: “ global.consumer.”

Regarding the larger situation, it seems like the global table and a 
distributed cache would display the same basic behavior in terms of the 
potential for missed joins. Then, it probably makes sense to go for the option 
with fewer components to implement and maintain, which to me points to the 
global KTable. 

Since you can anticipate that missed joins can be a problem, you can build in 
some metrics and reporting for how many misses you actually observe, and 
potentially redesign the app if it’s actually a problem. 

I hope this helps!
-John 

On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> Thanks John... appreciate your inputs and suggestions. I have been assigned
> recently to this task (of persisting the cache) and haven't been involved
> in original design and architecture and agree with all the issues you have
> highlighted.
> However, at this point, i don't think the application can be converted to
> streams since the design is not flexible and it would require lot of
> rewrite of code plus subsequent testing.
> 
> My first thought was to use external database only,  preferably the
> distributed caching systems like Apache Ignite since it will have least
> impact on performance. Going to database for every event would impact the
> throughput a lot. Probably having distributed caching (key/value pairs)
> would have comparatively lesser impact.
> Second choice is to go for GlobalKTable however this needs to be done very
> carefully.
> 
> Thanks again!
> 
> On Mon, May 4, 2020 at 11:18 PM Pushkar Deole  wrote:
> 
> > Thanks John... what parameters would affect the latency in case
> > GlobalKTable will be used and is there any configurations that could be
> > tuned to minimize the latency of sync with input topic?
> >
> > On Mon, May 4, 2020 at 10:20 PM John Roesler  wrote:
> >
> >> Hello Pushkar,
> >>
> >> Yes, that’s correct. The operation you describe is currently not
> >> supported. If you want to keep the structure you described in place, I’d
> >> suggest using an external database for the admin objects. I’ll give another
> >> idea below.
> >>
> >> With your current architecture, I’m a little concerned about data races.
> >> From what I saw, nothing would prevent processing stream records with agent
> >> 10 before you process the admin record with agent 10. This problem will
> >> persist no matter where you locate the cache.
> >>
> >> GlobalKTable would no doubt make it worse, since it increases the latency
> >> before admin record 10 is queriable everywhere.
> >>
> >> I think you’ll want to make a call between architecture simplicity
> >> (remote cache or global KTable) vs the probability of missed joins.
> >>
> >> I think the “best” way to solve this problem (that comes to mind anyway)
> >> might be to
> >> 1. Repartition the stream to be co-partitioned with the admin records.
> >> 2. Do a local (not global) stream-table join
> >> 3. Enable task idling
> >>
> >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make the
> >> agent Id the new key of the stream, and then use ‘through’, (where the
> >> intermediate topic has the same number of partitions as the admin topic) to
> >> do the repartitioning. In 2.6, there is a “repartition” operator that will
> >> make this easier.
> >>
> >> The repartition ensures that all stream records with agent id 10 will be
> >> processed by the same thread that processes the admin records with agent id
> >> 10, hence it will be able to find agent 10 in the local KTable store.
> >>
> >> Task idling will minimize your chances of missing any enrichments. When a
> >> task has two inputs (E.g., your repartitioned stream joining with the admin
> >> table), it makes Streams wait until both inputs are buffered before
> >> processing, so it can do a better job of processing in timestamp order.
> >>
> >> I hope this helps!
> >> -John
> >>
> >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> >> > If i understand correctly, Kafka is not designed to provide replicated
> >> > caching mechanism wherein the updates to cache will be synchronous
> >> across
> >> > multiple cache instances.
> >> >
> >> > On Sun, May 3, 2020 at 10

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-04 Thread John Roesler
Hello Pushkar,

Yes, that’s correct. The operation you describe is currently not supported. If 
you want to keep the structure you described in place, I’d suggest using an 
external database for the admin objects. I’ll give another idea below.  

With your current architecture, I’m a little concerned about data races. From 
what I saw, nothing would prevent processing stream records with agent 10 
before you process the admin record with agent 10. This problem will persist no 
matter where you locate the cache.

GlobalKTable would no doubt make it worse, since it increases the latency 
before admin record 10 is queriable everywhere.

I think you’ll want to make a call between architecture simplicity (remote 
cache or global KTable) vs the probability of missed joins. 

I think the “best” way to solve this problem (that comes to mind anyway) might 
be to
1. Repartition the stream to be co-partitioned with the admin records.
2. Do a local (not global) stream-table join
3. Enable task idling

You can do the repartition today with a ‘map’ or ‘selectKey’ to make the agent 
Id the new key of the stream, and then use ‘through’, (where the intermediate 
topic has the same number of partitions as the admin topic) to do the 
repartitioning. In 2.6, there is a “repartition” operator that will make this 
easier. 

The repartition ensures that all stream records with agent id 10 will be 
processed by the same thread that processes the admin records with agent id 10, 
hence it will be able to find agent 10 in the local KTable store. 

Task idling will minimize your chances of missing any enrichments. When a task 
has two inputs (E.g., your repartitioned stream joining with the admin table), 
it makes Streams wait until both inputs are buffered before processing, so it 
can do a better job of processing in timestamp order. 

I hope this helps!
-John 

On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> If i understand correctly, Kafka is not designed to provide replicated
> caching mechanism wherein the updates to cache will be synchronous across
> multiple cache instances.
> 
> On Sun, May 3, 2020 at 10:49 PM Pushkar Deole  wrote:
> 
> > Thanks John.
> >
> > Actually, this is a normal consumer-producer application wherein there are
> > 2 consumers (admin consumer and main consumer) consuming messages from 2
> > different topics.
> > One of the consumers consumes messages from a admin topic and populates
> > data in a cache e.g. lets say agent with agent id 10 for which the first
> > name and last name is received is populated in cache. When the other
> > consumer consumes message and it has agent id 10 then it reads the cache,
> > appends the first name and last name and then sends enriched event to
> > producer.
> > In this case, each application instance consumes all the events from admin
> > topic (unique consumer id) and keeps them in the cache in memory.
> > Now the requirement is to persist the cache and make is shared between the
> > application instances, so each instance would consume partitions of admin
> > topic and write to admin cache.
> >
> > If we want to use kafka streams, the application is so much evolved that
> > it is difficult to migrate to streams at this stage. Secondly, from past
> > mail chains, streams also won't serve the requirement since local state
> > stores would just hold the local state of admin data and the cache written
> > by each instance won't be shared with other instances.
> >
> > Global state stores may help but again it requires writing to the topic
> > which is then synced with the state stores in the instances and the
> > instances may not be in sync with each.
> > I am not sure if this would cause any inconsistencies since i don't know
> > how the events would flow from source e.g. if admin data is consumed by one
> > instance which then modified the topic but it is not yet synced to all the
> > global state stores and the next event arrived on the main consumer on a
> > different instance and it tried to read from store cache then it doesn't
> > get the data, so the event passed on without enriched data.
> > That's pretty much about the use case.
> >
> >
> > On Sun, May 3, 2020 at 9:42 PM John Roesler  wrote:
> >
> >> Hi Pushkar,
> >>
> >> I’ve been wondering if we should add writable tables to the Streams api.
> >> Can you explain more about your use case and how it would integrate with
> >> your application?
> >>
> >> Incidentally, this would also help us provide more concrete advice.
> >>
> >> Thanks!
> >> John
> >>
> >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> >> > Both stores sever a differe

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-03 Thread John Roesler
Hi Pushkar,

I’ve been wondering if we should add writable tables to the Streams api. Can 
you explain more about your use case and how it would integrate with your 
application?

Incidentally, this would also help us provide more concrete advice. 

Thanks!
John

On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> Both stores sever a different purpose.
> 
> Regular stores allow you to store state the application computes.
> Writing into the changelog is a fault-tolerance mechanism.
> 
> Global store hold "axially" data that is provided from "outside" of the
> app. There is no changelog topic, but only the input topic (that is used
> to re-create the global state).
> 
> Local stores are sharded and updates are "sync" as they don't need to be
> shared with anybody else.
> 
> For global stores, as all instances need to be updated, updates are
> async (we don't know when which instance will update it's own global
> store replica).
> 
> >> Say one stream thread updates the topic for global store and starts
> >> processing next event wherein the processor tries to read the global store
> >> which may not have been synced with the topic?
> 
> Correct. There is no guarantee when the update to the global store will
> be applied. As said, global stores are not designed to hold data the
> application computes.
> 
> 
> -Matthias
> 
> 
> On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > thanks... will try with GlobalKTable.
> > As a side question, I didn't really understand the significance of global
> > state store which kind of works in a reverse way to local state store i.e.
> > local state store is updated and then saved to changelog topic whereas in
> > case of global state store the topic is updated first and then synced to
> > global state store. Do these two work in sync i.e. the update to topic and
> > global state store ?
> > 
> > Say one stream thread updates the topic for global store and starts
> > processing next event wherein the processor tries to read the global store
> > which may not have been synced with the topic?
> > 
> > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax  wrote:
> > 
> >> Yes.
> >>
> >> A `GlobalKTable` uses a global store internally.
> >>
> >> You can also use `StreamsBuilder.addGlobalStore()` or
> >> `Topology.addGlobalStore()` to add a global store "manually".
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> >>> Thanks Matthias.
> >>> Can you elaborate on the replicated caching layer part?
> >>> When you say global stores, do you mean GlobalKTable created from a topic
> >>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> >>>
> >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax 
> >> wrote:
> >>>
>  It's not possible to modify state store from "outside".
> 
>  If you want to build a "replicated caching layer", you could use global
>  stores and write into the corresponding topics to update all stores. Of
>  course, those updates would be async.
> 
> 
>  -Matthias
> 
>  On 4/29/20 10:52 PM, Pushkar Deole wrote:
> > Hi All,
> >
> > I am wondering if this is possible: i have been asked to use state
> >> stores
> > as a general replicated cache among multiple instances of service
>  instances
> > however the state store is created through streambuilder but is not
> > actually modified through stream processor topology however it is to be
> > modified from outside the stream topology. So, essentially, the state
>  store
> > is just to be created from streambuilder and then to be used as an
> > application level cache that will get replicated between application
> > instances. Is this possible using state stores?
> >
> > Secondly, if possible, is this a good design approach?
> >
> > Appreciate your response since I don't know the internals of state
>  stores.
> >
> 
> 
> >>>
> >>
> >>
> > 
> 
> 
> Attachments:
> * signature.asc


Re: Deserialization Exception when performing state store operations

2020-04-20 Thread John Roesler
Hi Carl,

That sounds pretty frustrating; sorry about that.

I think I got a hint, but I'm not totally clear on the situation. It shouldn't
be possible for data to get into the store if it can't be handled by the serde.
There is a specific issue with global stores, but it doesn't sound like that's
your situation.

It sounds like what has happened is that the serde itself has "forgotten"
how to handle data that it previously could handle, because someone
deleted the schema from the registry. Realistically, this sounds more like
a situation to prevent than one to handle. From an "ivory tower" perspective,
it seems like you could:
* establish a rule that once the schema has been used, it can only be
  evolved
* or that you would establish a deprecation period in which both
  the new and old schema are present, so you can translate all the records
* or if you really need to make a discontinuous change to the schema, it
  probably means that the store is now completely different in meaning, and
  you could just give it a different name and load it from scratch

But if nothing like that fits the bill, then you could consider wrapping the 
avro
serde in an error-handling serde that delegates to your avro serde, catches
the deserialization exception, and instead returns a sentinel value that you
can handle somehow?

I hope this helps,
Thanks,
-John

On Mon, Apr 20, 2020, at 21:59, Carl Graving wrote:
> I have a state store being built from a stream with a custom set of value
> serdes. In the stream processing I am able to handle errors appropriately
> with the exception handler, but if I attempt to do a state store getAll and
> use the iterator.hasNext() or next() or peek methods, any exceptions in the
> deserialization are thrown. This is proving to be difficult to iterate over
> the items in the state store and skip bad entries or remove them. I can see
> different ways this could happen, such as avro schema removed from schema
> registry (and cache) for an item in the state store. Am I missing an easy
> way to deal with deserialization exceptions stemming from state store
> iterator or gets? I will keep playing around, but it was hard to find a way
> to reliably use the iterator (getAll) when hasNext, next, peekNext, etc all
> throw exceptions.
> 
> Thanks for any help or pointers on how to properly handle exceptions in
> this case.
> 
> Carl
>


Re: Kafka Streams - issues with windowing and suppress

2020-04-20 Thread John Roesler
Yes, thanks, Liam!

By the way, There's actually already a ticket to try and improve the API, 
and the discussed solution is basically the same thing I said had never
occurred to me before, so I'm not sure what to say about that...

https://issues.apache.org/jira/browse/KAFKA-8924

The ticket seems abandoned, though, so it might be up for grabs if you
want to make sure it gets resolved asap. I'll add a comment with my
proposal.

Thanks,
-John

On Mon, Apr 20, 2020, at 01:08, Matthias J. Sax wrote:
> Thanks for the PR!
> 
> On 4/19/20 10:04 PM, Liam Clarke-Hutchinson wrote:
> > PR submitted :) https://github.com/apache/kafka/pull/8520
> > 
> > On Mon, Apr 20, 2020 at 2:34 PM John Roesler  wrote:
> > 
> >> Hi Liam,
> >>
> >> That sounds like a good idea to me. In fact, I’d go so far as to say we
> >> should just change the existing example to include a grace period, and not
> >> bother with an extra example. That would put it front and center.
> >>
> >> A PR would be greatly appreciated! Thanks for the offer!
> >>
> >> Thanks,
> >> John
> >>
> >> On Sun, Apr 19, 2020, at 19:58, Liam Clarke wrote:
> >>> Hi Matthias,
> >>>
> >>> I think as an interim measure, if the windowing samples in the docs
> >> showed
> >>> an additional example where the grace period was set (with perhaps a
> >>> comment about the current default grace period, and planned future
> >>> changes?) it would make it sufficiently visible - happy to submit a PR
> >> with
> >>> those changes if it seems appropriate.
> >>>
> >>> Cheers,
> >>>
> >>> Liam Clarke-Hutchinson
> >>>
> >>> On Mon, Apr 20, 2020 at 12:12 PM Matthias J. Sax 
> >> wrote:
> >>>
> >>>> I would prefer to not make the grace-period a mandatory argument and
> >>>> keep the API as-is. I understand the issue of backward compatibility,
> >>>> but I would still argue that we should just change the default grace
> >>>> period to 0 in the 3.0 release. It's a major release and thus it seems
> >>>> to be fine. To prepare for this change, we could start to log a WARN
> >>>> message, if a user does not set the grace period explicitly for now.
> >>>>
> >>>> Just my 2 ct. Thoughts?
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 4/19/20 7:40 AM, John Roesler wrote:
> >>>>> Oh, man, that’s a good idea.
> >>>>>
> >>>>> I can propose to deprecate (not remove) the existing ‘of’ factory
> >> method
> >>>> and add one with a mandatory grace period. Not sure why I didn’t think
> >> of
> >>>> that before. Probably too caught up in looking for something “smart”.
> >>>>>
> >>>>> Thanks!
> >>>>> John
> >>>>>
> >>>>> On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> >>>>>> Hi John,
> >>>>>>
> >>>>>> I can't really think of a way to make it more obvious without
> >> breaking
> >>>>>> backwards compatibility - e.g., obvious easy fix is that grace
> >> period
> >>>> is a
> >>>>>> mandatory arg to TimeWindows, but that would definitely break
> >>>> compatibility.
> >>>>>>
> >>>>>> Cheers,
> >>>>>>
> >>>>>> Liam Clarke-Hutchinson
> >>>>>>
> >>>>>> On Thu, Apr 16, 2020 at 1:59 AM John Roesler 
> >>>> wrote:
> >>>>>>
> >>>>>>> Boom, you got it, Liam! Nice debugging work.
> >>>>>>>
> >>>>>>> This is a pretty big bummer, but I had to do it that way for
> >>>>>>> compatibility. I added a log message to try and help reduce the
> >> risk,
> >>>> but
> >>>>>>> it’s still kind of a trap.
> >>>>>>>
> >>>>>>> I’d like to do a KIP at some point to consider changing the default
> >>>> grace
> >>>>>>> period, but haven’t done it because it’s not clear what the default
> >>>> should
> >>>>>>> be.
> >>>>>>>
> >>>>>>> Please let me know if you have any ideas!
> >>>>>>> Than

Re: Unexpected behaviour on windowing aggregations

2020-04-19 Thread John Roesler
Hey Liam,

Hah! Tell me about it...

Well, let’s hope that was it. 
Thanks,
John

On Sun, Apr 19, 2020, at 18:43, Liam Clarke wrote:
> Hi John,
> 
> Thanks for the reply - yep, that was a dumb copy and paste error, which is
> what I get for coding while surrounded by kids. >_< I'm deploying a fixed
> version of it as we speak. Thanks for the reply though :)
> 
> Kind regards,
> 
> Liam Clarke
> 
> 
> 
> On Mon, 20 Apr. 2020, 2:08 am John Roesler,  wrote:
> 
> > Hi Liam,
> >
> > I took a quick look. On the output side, it looks like you’re adding the
> > count to the prior count. Should that just set the outbound vale to the new
> > count? Maybe I misunderstood the situation.
> >
> > What I mean is, suppose you get two events for the same window:
> >
> > Inbound map := 0+1 = 1
> > Count = 1
> > Outbound map := 0+1 = 1
> > (Proposed outbound := 1)
> >
> > Then,
> >
> > Inbound map := 1+1 = 2
> > Count = 2
> > Outbound map := 1+2 = 3
> > (Proposed outbound := 2)
> >
> > Does that make sense?
> > -John
> >
> > On Sun, Apr 19, 2020, at 03:08, Liam Clarke wrote:
> > > Hello all,
> > >
> > > I have been running this code against production data, and I'm emitting
> > > counts/sums for a sentinel record id to stdout so I can observe the
> > > behaviour:
> > >
> > > https://gist.github.com/LiamClarkeNZ/b101ce6a42a2e5e1efddfe3a98c5805f
> > >
> > > When this code is run, the window duration is 2 minutes, grace period is
> > 20
> > > seconds, and retention time is 20 minutes.
> > >
> > > I am endeavouring to use event time as the timestamp basis for this
> > process:
> > > https://gist.github.com/LiamClarkeNZ/8265cec02e21f5969e0fedb8281a2180
> > >
> > > So, my sentinel debugging output shows a surprising behaviour in that the
> > > outbound counts for the key always sum higher than the inbound count. For
> > > example:
> > >
> > > Sample: 2020-04-19T07:31:37.492Z
> > >
> > > Inbound
> > > {
> > > 2020-04-19T03:00:00Z=4563,
> > > 2020-04-19T04:00:00Z=5629,
> > > 2020-04-19T05:00:00Z=8489,
> > > 2020-04-19T06:00:00Z=13599
> > > }
> > >
> > > Outbound
> > > {
> > > 2020-04-19T03:00:00Z=4717,
> > > 2020-04-19T04:00:00Z=5890,
> > > 2020-04-19T05:00:00Z=8826,
> > > 2020-04-19T06:00:00Z=13951
> > > }
> > >
> > > This makes me suspect that either I'm not using the window I thought I
> > was
> > > (e.g., I'm somehow using a sliding window instead of a tumbling window)
> > or
> > > that I have made a rookie error somewhere in my aggregations, or I've
> > just
> > > misunderstood something about this. Does it matter that the window size
> > in
> > > the persistent window store doesn't match the windowing time + grace time
> > > in the windowing clause?
> > >
> > > Any pointers gratefully welcome.
> > >
> > > Kind regards,
> > >
> > > Liam Clarke-Hutchinson
> > >
> >
>


Re: Kafka Streams - issues with windowing and suppress

2020-04-19 Thread John Roesler
Hi Liam,

That sounds like a good idea to me. In fact, I’d go so far as to say we should 
just change the existing example to include a grace period, and not bother with 
an extra example. That would put it front and center. 

A PR would be greatly appreciated! Thanks for the offer!

Thanks,
John

On Sun, Apr 19, 2020, at 19:58, Liam Clarke wrote:
> Hi Matthias,
> 
> I think as an interim measure, if the windowing samples in the docs showed
> an additional example where the grace period was set (with perhaps a
> comment about the current default grace period, and planned future
> changes?) it would make it sufficiently visible - happy to submit a PR with
> those changes if it seems appropriate.
> 
> Cheers,
> 
> Liam Clarke-Hutchinson
> 
> On Mon, Apr 20, 2020 at 12:12 PM Matthias J. Sax  wrote:
> 
> > I would prefer to not make the grace-period a mandatory argument and
> > keep the API as-is. I understand the issue of backward compatibility,
> > but I would still argue that we should just change the default grace
> > period to 0 in the 3.0 release. It's a major release and thus it seems
> > to be fine. To prepare for this change, we could start to log a WARN
> > message, if a user does not set the grace period explicitly for now.
> >
> > Just my 2 ct. Thoughts?
> >
> > -Matthias
> >
> > On 4/19/20 7:40 AM, John Roesler wrote:
> > > Oh, man, that’s a good idea.
> > >
> > > I can propose to deprecate (not remove) the existing ‘of’ factory method
> > and add one with a mandatory grace period. Not sure why I didn’t think of
> > that before. Probably too caught up in looking for something “smart”.
> > >
> > > Thanks!
> > > John
> > >
> > > On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> > >> Hi John,
> > >>
> > >> I can't really think of a way to make it more obvious without breaking
> > >> backwards compatibility - e.g., obvious easy fix is that grace period
> > is a
> > >> mandatory arg to TimeWindows, but that would definitely break
> > compatibility.
> > >>
> > >> Cheers,
> > >>
> > >> Liam Clarke-Hutchinson
> > >>
> > >> On Thu, Apr 16, 2020 at 1:59 AM John Roesler 
> > wrote:
> > >>
> > >>> Boom, you got it, Liam! Nice debugging work.
> > >>>
> > >>> This is a pretty big bummer, but I had to do it that way for
> > >>> compatibility. I added a log message to try and help reduce the risk,
> > but
> > >>> it’s still kind of a trap.
> > >>>
> > >>> I’d like to do a KIP at some point to consider changing the default
> > grace
> > >>> period, but haven’t done it because it’s not clear what the default
> > should
> > >>> be.
> > >>>
> > >>> Please let me know if you have any ideas!
> > >>> Thanks,
> > >>> -John
> > >>>
> > >>>
> > >>> On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> > >>>> And the answer is to change
> > >>>> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> > >>>> and specify the grace period:
> > >>>>
> > >>>>
> > >>>
> > windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> > >>>>
> > >>>> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <
> > liam.cla...@adscale.co.nz>
> > >>>> wrote:
> > >>>>
> > >>>>> Okay, doing some debugging it looks like I'm seeing this behaviour
> > >>> because
> > >>>>> it's picking up a grace duration of 86,395,000 ms in
> > >>>>> KTableImpl.buildSuppress, which would happen to be  5000 millis (my
> > >>> window
> > >>>>> size) off 24 hours, so I've got some clues!
> > >>>>>
> > >>>>> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <
> > liam.cla...@adscale.co.nz
> > >>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi all,
> > >>>>>>
> > >>>>>> I have a case where I want to consume from a topic, count the number
> > >>> of
> > >>>>>> certain ids in a given time period X, and emit a new record to a
> > >>> different
> > >>>>>> topic after that same time period X has ela

Re: Kafka Streams - issues with windowing and suppress

2020-04-19 Thread John Roesler
Oh, man, that’s a good idea.

I can propose to deprecate (not remove) the existing ‘of’ factory method and 
add one with a mandatory grace period. Not sure why I didn’t think of that 
before. Probably too caught up in looking for something “smart”.

Thanks!
John

On Sun, Apr 19, 2020, at 02:27, Liam Clarke wrote:
> Hi John,
> 
> I can't really think of a way to make it more obvious without breaking
> backwards compatibility - e.g., obvious easy fix is that grace period is a
> mandatory arg to TimeWindows, but that would definitely break compatibility.
> 
> Cheers,
> 
> Liam Clarke-Hutchinson
> 
> On Thu, Apr 16, 2020 at 1:59 AM John Roesler  wrote:
> 
> > Boom, you got it, Liam! Nice debugging work.
> >
> > This is a pretty big bummer, but I had to do it that way for
> > compatibility. I added a log message to try and help reduce the risk, but
> > it’s still kind of a trap.
> >
> > I’d like to do a KIP at some point to consider changing the default grace
> > period, but haven’t done it because it’s not clear what the default should
> > be.
> >
> > Please let me know if you have any ideas!
> > Thanks,
> > -John
> >
> >
> > On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> > > And the answer is to change
> > > .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> > > and specify the grace period:
> > >
> > >
> > windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> > >
> > > On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke 
> > > wrote:
> > >
> > > > Okay, doing some debugging it looks like I'm seeing this behaviour
> > because
> > > > it's picking up a grace duration of 86,395,000 ms in
> > > > KTableImpl.buildSuppress, which would happen to be  5000 millis (my
> > window
> > > > size) off 24 hours, so I've got some clues!
> > > >
> > > > On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke  > >
> > > > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I have a case where I want to consume from a topic, count the number
> > of
> > > >> certain ids in a given time period X, and emit a new record to a
> > different
> > > >> topic after that same time period X has elapsed containing the
> > aggregated
> > > >> value.
> > > >>
> > > >> I'm using suppress with Suppressed.untilWindowCloses, but nothing is
> > ever
> > > >> emitted, nor is my peek placed after the suppress ever being hit.
> > > >> My code is in the below Gist - I've hardcoded the durations for 5
> > seconds
> > > >> after testing purposes:
> > > >> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> > > >>
> > > >> I'm assuming I've misunderstood something drastically, and would
> > greatly
> > > >> appreciate a pointer on where I may have gone wrong. I'm wondering if
> > I
> > > >> need a larger retention on the persistent store?
> > > >>
> > > >> I understand that events have to arrive in order for windows to
> > close, so
> > > >> I've sent events after the window has expired to attempt to move the
> > window
> > > >> on, and my first peek (before the suppression) is emitting as I do:
> > > >>
> > > >> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > > >> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> > > >> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> > > >>
> > > >>
> > > >>  Any guidance greatfully appreciated.
> > > >>
> > > >> Kind regards,
> > > >>
> > > >> Liam Clarke
> > > >>
> > > >
> > >
> >
>


Re: Unexpected behaviour on windowing aggregations

2020-04-19 Thread John Roesler
Hi Liam,

I took a quick look. On the output side, it looks like you’re adding the count 
to the prior count. Should that just set the outbound vale to the new count? 
Maybe I misunderstood the situation. 

What I mean is, suppose you get two events for the same window:

Inbound map := 0+1 = 1
Count = 1
Outbound map := 0+1 = 1
(Proposed outbound := 1)

Then,

Inbound map := 1+1 = 2
Count = 2
Outbound map := 1+2 = 3
(Proposed outbound := 2)

Does that make sense?
-John

On Sun, Apr 19, 2020, at 03:08, Liam Clarke wrote:
> Hello all,
> 
> I have been running this code against production data, and I'm emitting
> counts/sums for a sentinel record id to stdout so I can observe the
> behaviour:
> 
> https://gist.github.com/LiamClarkeNZ/b101ce6a42a2e5e1efddfe3a98c5805f
> 
> When this code is run, the window duration is 2 minutes, grace period is 20
> seconds, and retention time is 20 minutes.
> 
> I am endeavouring to use event time as the timestamp basis for this process:
> https://gist.github.com/LiamClarkeNZ/8265cec02e21f5969e0fedb8281a2180
> 
> So, my sentinel debugging output shows a surprising behaviour in that the
> outbound counts for the key always sum higher than the inbound count. For
> example:
> 
> Sample: 2020-04-19T07:31:37.492Z
> 
> Inbound
> {
> 2020-04-19T03:00:00Z=4563,
> 2020-04-19T04:00:00Z=5629,
> 2020-04-19T05:00:00Z=8489,
> 2020-04-19T06:00:00Z=13599
> }
> 
> Outbound
> {
> 2020-04-19T03:00:00Z=4717,
> 2020-04-19T04:00:00Z=5890,
> 2020-04-19T05:00:00Z=8826,
> 2020-04-19T06:00:00Z=13951
> }
> 
> This makes me suspect that either I'm not using the window I thought I was
> (e.g., I'm somehow using a sliding window instead of a tumbling window) or
> that I have made a rookie error somewhere in my aggregations, or I've just
> misunderstood something about this. Does it matter that the window size in
> the persistent window store doesn't match the windowing time + grace time
> in the windowing clause?
> 
> Any pointers gratefully welcome.
> 
> Kind regards,
> 
> Liam Clarke-Hutchinson
>


Re: How to add partitions to an existing kafka topic

2020-04-17 Thread John Roesler
Hi Sachin,

I’m a bit hazy on the details of the broker partition expansion feature. It’s 
been a while since I looked at it. But you actually control the 
key-to-partition mapping at the producer side. The producer’s default 
partitioner just hashes the keys over the partition, but you could plug in your 
own partitioner to remember which keys it sent to which partitions and do what 
you want. 

I hope that helps,
John

On Wed, Apr 15, 2020, at 12:22, Sachin Mittal wrote:
> Hi,
> I will look into the suggestions you folks mentioned.
> 
> I was just wondering something from just kafka point of view.
> Lets say we add new partitions to kafka topics. Is there any way to
> configure that only new keys get their messages added to those partitions.
> Existing keys continue to add their messages to previous partitions.
> 
> Or the moment we add new partition the kafka completely re-distributes all
> the older messages too among all the partitions.
> And if it does that then does it ensure that in this re-distributions it
> keeps messages of same key in same partition.
> 
> Thanks
> Sachin
> 
> 
> 
> 
> On Wed, Apr 15, 2020 at 10:19 PM John Roesler  wrote:
> 
> > Hi Sachin,
> >
> > Just to build on Boyang’s answer a little, when designing Kafka’s
> > partition expansion operation, we did consider making it work also for
> > dynamically repartitioning in a way that would work for Streams as well,
> > but it added too much complexity, and the contributor had some other use
> > cases in mind.
> >
> > In Streams, we have some ideas for improving the dynamic scalability, but
> > for now, your best bet is to stop the app and clone the topic in question
> > into a new topic with more partitions, then point the app to the new input
> > topic. Depending on the application, you might also have changelog topics
> > and repartition topics to worry about. The easiest thing is just to reset
> > the app, if you can tolerate it.
> >
> > Iirc, Jan Filipiak has mentioned some techniques or tooling he developed
> > to automate this process. You might search the archives to see what you can
> > dig up. I think it was pretty much what I said above.
> >
> > Hope this helps,
> > John
> >
> > On Wed, Apr 15, 2020, at 10:23, Boyang Chen wrote:
> > > Hey Sachin,
> > >
> > > your observation is correct, unfortunately Kafka Streams doesn't support
> > > adding partitions online. The rebalance could not guarantee the same key
> > > routing to the same partition when the input topic partition changes, as
> > > this is the upstream producer's responsibility to consistently route the
> > > same key data, which is not resolved today.
> > >
> > > Boyang
> > >
> > > On Wed, Apr 15, 2020 at 7:23 AM Sachin Mittal 
> > wrote:
> > >
> > > > Hi,
> > > > We have a kafka streams application which runs multiple instances and
> > > > consumes from a source topic.
> > > > Producers produces keyed messages to this source topic.
> > > > Keyed messages are events from different sources and each source has a
> > > > unique key.
> > > >
> > > > So what essentially happens is that messages from particular source
> > always
> > > > gets added to a particular partition.
> > > > Hence we can run multiple instances of streams application with a
> > > > particular instance processing messages for certain partitions.
> > > > We will never get into a case where messages for a source are
> > processed by
> > > > different instances of streams application simultaneously.
> > > >
> > > > So far so good.
> > > >
> > > > Now over time new sources are added. It may so happen that we reach a
> > > > saturation point and have no option but to increase number of
> > partitions.
> > > >
> > > > So what is the best practice to increase number of partitions.
> > > > Is there a way to ensure that existing key's messages continue to get
> > > > published on same partition as before.
> > > > And only new source's keys gets their messages published on the new
> > > > partition we add.
> > > >
> > > > If this is not possible then does kafka's re-partition mechanism ensure
> > > > that during re-balance all the previous messages of a particular key
> > gets
> > > > moved to same partition.
> > > > I guess under this approach we would have to stop our streaming
> > application
> > > > till re-balance is over otherwise messages for same key may get
> > processed
> > > > by different instances of the application.
> > > >
> > > > Anyway just wanted to know how such a problem is tackled on live
> > systems
> > > > real time, or how some of you have approached the same.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > >
> >
>


Re: How to add partitions to an existing kafka topic

2020-04-15 Thread John Roesler
Hi Sachin,

Just to build on Boyang’s answer a little, when designing Kafka’s partition 
expansion operation, we did consider making it work also for dynamically 
repartitioning in a way that would work for Streams as well, but it added too 
much complexity, and the contributor had some other use cases in mind. 

In Streams, we have some ideas for improving the dynamic scalability, but for 
now, your best bet is to stop the app and clone the topic in question into a 
new topic with more partitions, then point the app to the new input topic. 
Depending on the application, you might also have changelog topics and 
repartition topics to worry about. The easiest thing is just to reset the app, 
if you can tolerate it. 

Iirc, Jan Filipiak has mentioned some techniques or tooling he developed to 
automate this process. You might search the archives to see what you can dig 
up. I think it was pretty much what I said above. 

Hope this helps,
John

On Wed, Apr 15, 2020, at 10:23, Boyang Chen wrote:
> Hey Sachin,
> 
> your observation is correct, unfortunately Kafka Streams doesn't support
> adding partitions online. The rebalance could not guarantee the same key
> routing to the same partition when the input topic partition changes, as
> this is the upstream producer's responsibility to consistently route the
> same key data, which is not resolved today.
> 
> Boyang
> 
> On Wed, Apr 15, 2020 at 7:23 AM Sachin Mittal  wrote:
> 
> > Hi,
> > We have a kafka streams application which runs multiple instances and
> > consumes from a source topic.
> > Producers produces keyed messages to this source topic.
> > Keyed messages are events from different sources and each source has a
> > unique key.
> >
> > So what essentially happens is that messages from particular source always
> > gets added to a particular partition.
> > Hence we can run multiple instances of streams application with a
> > particular instance processing messages for certain partitions.
> > We will never get into a case where messages for a source are processed by
> > different instances of streams application simultaneously.
> >
> > So far so good.
> >
> > Now over time new sources are added. It may so happen that we reach a
> > saturation point and have no option but to increase number of partitions.
> >
> > So what is the best practice to increase number of partitions.
> > Is there a way to ensure that existing key's messages continue to get
> > published on same partition as before.
> > And only new source's keys gets their messages published on the new
> > partition we add.
> >
> > If this is not possible then does kafka's re-partition mechanism ensure
> > that during re-balance all the previous messages of a particular key gets
> > moved to same partition.
> > I guess under this approach we would have to stop our streaming application
> > till re-balance is over otherwise messages for same key may get processed
> > by different instances of the application.
> >
> > Anyway just wanted to know how such a problem is tackled on live systems
> > real time, or how some of you have approached the same.
> >
> > Thanks
> > Sachin
> >
>


Re: Kafka Streams - issues with windowing and suppress

2020-04-15 Thread John Roesler
Boom, you got it, Liam! Nice debugging work. 

This is a pretty big bummer, but I had to do it that way for compatibility. I 
added a log message to try and help reduce the risk, but it’s still kind of a 
trap. 

I’d like to do a KIP at some point to consider changing the default grace 
period, but haven’t done it because it’s not clear what the default should be. 

Please let me know if you have any ideas!
Thanks,
-John


On Tue, Apr 14, 2020, at 23:44, Liam Clarke wrote:
> And the answer is to change
> .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
> and specify the grace period:
>  
> windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
> 
> On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke 
> wrote:
> 
> > Okay, doing some debugging it looks like I'm seeing this behaviour because
> > it's picking up a grace duration of 86,395,000 ms in
> > KTableImpl.buildSuppress, which would happen to be  5000 millis (my window
> > size) off 24 hours, so I've got some clues!
> >
> > On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke 
> > wrote:
> >
> >> Hi all,
> >>
> >> I have a case where I want to consume from a topic, count the number of
> >> certain ids in a given time period X, and emit a new record to a different
> >> topic after that same time period X has elapsed containing the aggregated
> >> value.
> >>
> >> I'm using suppress with Suppressed.untilWindowCloses, but nothing is ever
> >> emitted, nor is my peek placed after the suppress ever being hit.
> >> My code is in the below Gist - I've hardcoded the durations for 5 seconds
> >> after testing purposes:
> >> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
> >>
> >> I'm assuming I've misunderstood something drastically, and would greatly
> >> appreciate a pointer on where I may have gone wrong. I'm wondering if I
> >> need a larger retention on the persistent store?
> >>
> >> I understand that events have to arrive in order for windows to close, so
> >> I've sent events after the window has expired to attempt to move the window
> >> on, and my first peek (before the suppression) is emitting as I do:
> >>
> >> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> >> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
> >> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
> >>
> >>
> >>  Any guidance greatfully appreciated.
> >>
> >> Kind regards,
> >>
> >> Liam Clarke
> >>
> >
>


Re: Kafka Streams endless rebalancing

2020-04-10 Thread John Roesler
Hey Alex,

Huh.

Unprefixed configs apply to all consumers, but in this case, it's
irrelevant because only the "main" consumer participates in group
management (so the config only applies to the main consumer).

So you actually have max.poll.interval.ms set to Integer.MAX_VALUE,
which amounts to 25 days? I agree, in that case it doesn't seem like
it could be a slow batch. In fact, it couldn't be anything related to
polling, since you see rebalances sooner than 25 days.

If you have the broker logs, they'll contain the reason for the rebalance.
The only other thing I can think of that causes rebalances is failing to 
heartbeat. What do you have for session.timeout.ms and
heartbeat.interval.ms ?

If anyone else has any ideas, please jump in.

Thanks,
-John

On Fri, Apr 10, 2020, at 14:55, Alex Craig wrote:
> Thanks John, I double-checked my configs and I've actually got the
> max.poll.interval.ms set to the max (not prefixed with anything so
> presumably that’s the “main” consumer).  So I think that means the problem
> isn’t due to a single batch of messages not getting processed/committed
> within the polling cycle right?  I guess what I’m wondering is, could the
> OVERALL length of time needed to fully restore the state stores (which
> could be multiple topics with multiple partitions) be exceeding some
> timeout or threshold?  Thanks again for any ideas,
> 
> 
> 
> Alex C
> 
> 
> On Thu, Apr 9, 2020 at 9:36 AM John Roesler  wrote:
> 
> > Hi Alex,
> >
> > It sounds like your theory is plausible. After a rebalance, Streams needs
> > to restore its stores from the changelog topics. Currently, Streams
> > performs this restore operation in the same loop that does processing and
> > polls the consumer for more records. If the restore batches (or the
> > processing) take too long, Streams won’t be able to call Consumer#poll (on
> > the “main” consumer)within the max.poll.interval, which causes the
> > Consumer’s heartbeat thread to assume the instance is unhealthy and stop
> > sending heartbeats, which in turn causes another rebalance.
> >
> > You could try either adjusting the max poll interval for the _main_
> > consumer or decreasing the batch size for the _restore_ consumer to make
> > sure Streams can call poll() frequently enough to stay in the group. There
> > are prefixes you can add to the consumer configuration portions to target
> > the main or restore consumer.
> >
> > Also worth noting, we’re planning to change this up pretty soon, so that
> > restoration happens in a separate thread and doesn’t block polling like
> > this.
> >
> > I hope this helps!
> > -John
> >
> > On Thu, Apr 9, 2020, at 08:33, Alex Craig wrote:
> > > Hi all, I’ve got a Kafka Streams application running in a Kubernetes
> > > environment.  The topology on this application has 2 aggregations (and
> > > therefore 2 Ktables), both of which can get fairly large – the first is
> > > around 200GB and the second around 500GB.  As with any K8s platform, pods
> > > can occasionally get rescheduled or go down, which of course will cause
> > my
> > > application to rebalance.  However, what I’m seeing is the application
> > will
> > > literally spend hours rebalancing, without any errors being thrown or
> > other
> > > obvious causes for the frequent rebalances – all I can see in the logs is
> > > an instance will be restoring a state store from the changelog topic,
> > then
> > > suddenly it will have its partitions revoked and begin the join-group
> > > process all over again.  (I’m running 10 pods/instances of my app, and I
> > > see this same pattern in each instance)  In some cases it never really
> > > recovers from this rebalancing cycle – even after 12 hours or more - and
> > > I’ve had to scale down the application completely and start over by
> > purging
> > > the application state and re-consuming from earliest on the source topic.
> > > Interestingly, after purging and starting from scratch the application
> > > seems to recover from rebalances pretty easily.
> > >
> > > The storage I’m using is a NAS device, which admittedly is not
> > particularly
> > > fast.  (it’s using spinning disks and is shared amongst other tenants) As
> > > an experiment, I’ve tried switching the k8s storage to an in-memory
> > option
> > > (this is at the k8s layer - the application is still using the same
> > RocksDB
> > > stores) to see if that helps.  As it turns out, I never have the
> > rebalance
> > > problem when using an in-memory persistence layer.  If a pod goes 

Re: Kafka Streams endless rebalancing

2020-04-09 Thread John Roesler
Hi Alex,

It sounds like your theory is plausible. After a rebalance, Streams needs to 
restore its stores from the changelog topics. Currently, Streams performs this 
restore operation in the same loop that does processing and polls the consumer 
for more records. If the restore batches (or the processing) take too long, 
Streams won’t be able to call Consumer#poll (on the “main” consumer)within the 
max.poll.interval, which causes the Consumer’s heartbeat thread to assume the 
instance is unhealthy and stop sending heartbeats, which in turn causes another 
rebalance. 

You could try either adjusting the max poll interval for the _main_ consumer or 
decreasing the batch size for the _restore_ consumer to make sure Streams can 
call poll() frequently enough to stay in the group. There are prefixes you can 
add to the consumer configuration portions to target the main or restore 
consumer. 

Also worth noting, we’re planning to change this up pretty soon, so that 
restoration happens in a separate thread and doesn’t block polling like this. 

I hope this helps!
-John

On Thu, Apr 9, 2020, at 08:33, Alex Craig wrote:
> Hi all, I’ve got a Kafka Streams application running in a Kubernetes
> environment.  The topology on this application has 2 aggregations (and
> therefore 2 Ktables), both of which can get fairly large – the first is
> around 200GB and the second around 500GB.  As with any K8s platform, pods
> can occasionally get rescheduled or go down, which of course will cause my
> application to rebalance.  However, what I’m seeing is the application will
> literally spend hours rebalancing, without any errors being thrown or other
> obvious causes for the frequent rebalances – all I can see in the logs is
> an instance will be restoring a state store from the changelog topic, then
> suddenly it will have its partitions revoked and begin the join-group
> process all over again.  (I’m running 10 pods/instances of my app, and I
> see this same pattern in each instance)  In some cases it never really
> recovers from this rebalancing cycle – even after 12 hours or more - and
> I’ve had to scale down the application completely and start over by purging
> the application state and re-consuming from earliest on the source topic.
> Interestingly, after purging and starting from scratch the application
> seems to recover from rebalances pretty easily.
> 
> The storage I’m using is a NAS device, which admittedly is not particularly
> fast.  (it’s using spinning disks and is shared amongst other tenants) As
> an experiment, I’ve tried switching the k8s storage to an in-memory option
> (this is at the k8s layer - the application is still using the same RocksDB
> stores) to see if that helps.  As it turns out, I never have the rebalance
> problem when using an in-memory persistence layer.  If a pod goes down, the
> application spends around 10 - 15 minutes rebalancing and then is back to
> processing data again.
> 
> At this point I guess my main question is: when I’m using the NAS storage
> and the state stores are fairly large, could I be hitting some timeout
> somewhere that isn’t allowing the restore process to complete, which then
> triggers another rebalance?  In other words, the restore process is simply
> taking too long given the amount of data needed to restore and the slow
> storage?   I’m currently using Kafka 2.4.1, but I saw this same behavior in
> 2.3.  I am using a custom RocksDB config setter to limit off-heap memory,
> but I’ve tried removing that and saw no difference in the rebalance
> problem.  Again, no errors that I’m seeing or anything else in the logs
> that seems to indicate why it can never finish rebalancing.  I’ve tried
> turning on DEBUG logging but I’m having a tough time sifting through the
> amount of log messages, though I’m still looking.
> 
> If anyone has any ideas I would appreciate it, thanks!
> 
> Alex C
>


Re: Passing states stores around

2020-03-10 Thread John Roesler
Hi all,

I agree with Sachin.

It makes sense to break out some logic into separate classes, but you need to 
be careful not to leak state from different threads or tasks into each other. 
The easiest way to do this is to just instantiate your utilities in the 
Processor init. Then, you’ll be sure that each processor instance has its own 
independent scope, and you’ll also have the stores available. Don’t forget to 
clean up your utilities in the Processor close if necessary. Processors will be 
closed and re-initialized some times, and need to be able to have a clean slate 
in that case. 

Hope this helps!
-john

On Tue, Mar 10, 2020, at 01:38, Sachin Mittal wrote:
> Well I can suggest is that you can access state store via processor's
> context.
> Once you have the reference of the state store it can be passed to
> different classes via the same reference.
> 
> 
> On Tue, Mar 10, 2020 at 8:57 AM Navneeth Krishnan 
> wrote:
> 
> > Hi John,
> >
> > I'm using PAPI to create my topology which has 5 process functions. Out
> > which 3 are large functions (more than 1000 lines of code) and they have
> > about 2 KV stores each. Since the code is fairly large per function, I have
> > them split into classes by functionalities. and some method in the call
> > stack would need to access this state. What do you recommend in such
> > scenarios?
> >
> > Thanks
> >
> > On Mon, Mar 9, 2020 at 6:41 PM John Roesler  wrote:
> >
> > > Hi Navneeth,
> > >
> > > This sounds like an unusual use case. Can you provide more information on
> > > why this is required?
> > >
> > > Thanks,
> > > John
> > >
> > > On Mon, Mar 9, 2020, at 12:48, Navneeth Krishnan wrote:
> > > > Hi All,
> > > >
> > > > Any suggestions?
> > > >
> > > > Thanks
> > > >
> > > > On Sat, Mar 7, 2020 at 10:13 AM Navneeth Krishnan <
> > > reachnavnee...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Is there a recommended way of passing state stores around across
> > > different
> > > > > classes? The problem is state store can be fetched only if you have
> > > access
> > > > > to the context and in most of the scenarios look up to state store
> > > > > somewhere inside another class. I can think of two options. Either
> > add
> > > > > state store to thread local and access it or split the logic into
> > > multiple
> > > > > functions to fit everything inside an operator. But I wanted to check
> > > if
> > > > > there is any better or recommended approach.
> > > > >
> > > > > Thanks
> > > > >
> > > >
> > >
> >
>


Re: Passing states stores around

2020-03-09 Thread John Roesler
Hi Navneeth,

This sounds like an unusual use case. Can you provide more information on why 
this is required?

Thanks,
John

On Mon, Mar 9, 2020, at 12:48, Navneeth Krishnan wrote:
> Hi All,
> 
> Any suggestions?
> 
> Thanks
> 
> On Sat, Mar 7, 2020 at 10:13 AM Navneeth Krishnan 
> wrote:
> 
> > Hi All,
> >
> > Is there a recommended way of passing state stores around across different
> > classes? The problem is state store can be fetched only if you have access
> > to the context and in most of the scenarios look up to state store
> > somewhere inside another class. I can think of two options. Either add
> > state store to thread local and access it or split the logic into multiple
> > functions to fit everything inside an operator. But I wanted to check if
> > there is any better or recommended approach.
> >
> > Thanks
> >
>


Re: KafkaStreams GroupBy with new key. Can I skip repartition?

2020-03-02 Thread John Roesler
Hi, all,

Sorry for the confusion. I didn’t look too closely at it, I was just going by 
the fact that it was listed under the scope of KIP-221.

I agree that the final design of the KIP doesn’t have too much to do with the 
description of KAFKA-4835. Maybe we should remove that ticket from the KIP, and 
also give it a more specific name. 

I’ll ask in the ticket if Levani is also actively working on it, or if he was 
just planning on KIP-221. 

Thanks,
John

On Sun, Mar 1, 2020, at 20:13, Murilo Tavares wrote:
> I agree with Mathias. Can’t see how this KIP/PR helps with the problem
> described in the KAFKA-4835...
> 
> On Sun, Mar 1, 2020 at 2:16 PM Matthias J. Sax  wrote:
> 
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > I don't think that KIP-221 addressed the discussed use case.
> >
> > KIP-221 allows to force a repartitioning manually, while the use case
> > describe in the original email was to suppress/skip a repartitioning ste
> > p.
> >
> > The issue to avoid unnecessary repartitioning came up a few time
> > already and I personally believe it's worth to close this gap. But we
> > would need to do a KIP to introduce some API to allow user to tell
> > Kafka Streams that repartitioning is not necessary.
> >
> > In Apache Flink, there is an operator called
> > `reinterpretAsKeyedStream`. We could introduce something similar.
> >
> > - -Matthias
> >
> >
> > On 3/1/20 4:43 AM, John Roesler wrote:
> > > Hi all,
> > >
> > > The KIP is accepted and implemented already, but is blocked on
> > > code review: https://github.com/apache/kafka/pull/7170
> > >
> > > A quick note on the lack of recent progress... It's completely our
> > > fault, the reviews fell by the wayside during the 2.5.0 release
> > > cycle, and we haven't gotten back to it. The contributor, Levani,
> > > has been exceptionally patient with us and continually kept the PR
> > > up-to-date and mergeable since then.
> > >
> > > If you'd like to help get it across the line, Murilo, maybe you can
> > > give it a review?
> > >
> > > Thanks, John
> > >
> > > On Sat, Feb 29, 2020, at 20:52, Guozhang Wang wrote:
> > >> It is in progress, but I was not the main reviewer of that ticket
> > >> so I cannot say for sure. I saw the last update is on Jan/2019 so
> > >> maybe it's a bit loose now.. If you want to pick it up and revive
> > >> the KIP completion feel free to do so :)
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Fri, Feb 28, 2020 at 5:54 PM Murilo Tavares
> > >>  wrote:
> > >>
> > >>> Guozhang The ticket definitely describes what I’m trying to
> > >>> achieve. And should I be hopeful with the fact it’s in
> > >>> progress? :) Thanks for pointing that out. Murilo
> > >>>
> > >>> On Fri, Feb 28, 2020 at 2:57 PM Guozhang Wang
> > >>>  wrote:
> > >>>
> > >>>> Hi Murilo,
> > >>>>
> > >>>> Would this be helping your case?
> > >>>> https://issues.apache.org/jira/browse/KAFKA-4835
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>> On Fri, Feb 28, 2020 at 7:01 AM Murilo Tavares
> > >>>>  wrote:
> > >>>>
> > >>>>> Hi I am currently doing a simple KTable
> > >>>>> groupby().aggregate() in
> > >>>> KafkaStreams.
> > >>>>> In the groupBy I do need to select a new key, but I know
> > >>>>> for sure that
> > >>>> the
> > >>>>> new key would still fall in the same partition. Because of
> > >>>>> this, I
> > >>>> believe
> > >>>>> the repartition would not be necessary, but my question is:
> > >>>>> is it
> > >>>> possible
> > >>>>> to do a groupBy, changing the key, and tell KafkaStreams to
> > >>>>> not create
> > >>>> the
> > >>>>> repartition topic? Thanks Murilo
> > >>>>>
> > >>>>
> > >>>>
> > >>>> -- -- Guozhang
> > >>>>
> > >>>
> > >>
> > >>
> > >> -- -- Guozhang
> > >>
> > -BEGIN PGP SIGNATURE-
> >
> > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5cCfQACgkQO4miYXKq
> > /Og0fhAApNlB1LodYwne6x5Fqe5CSveY0c2bmBArDCgmd1BvAstf85ooR9ht05+c
> > 8e1sq/3iLcVaolLDXITK0ptfLB6ZkJRs/sUh4N1ebMNEMtJabAepJ/Y/eEmHJiYX
> > wZ8NcyAZC6QQzFEWavyllMGUVyBMM6ZwFk3/ahwWruCQovWcpxKeItgWqI5thR0B
> > FIVAE6k9qDOfZiu3Qd5Atshfov3PpfG1ezpj4LKqlKfgWhsU+P9U8kfAJVsrgc0i
> > qIPeya1o6hyyAzHnH09EMfNqcRpuJQvYwANq6Br/k+nH4WQQjxXvgE6n8scGJ0TH
> > alAnMmm62UNd88lSltNuF+vf73/omdymJkwMO4sTGK9tC8W5p2OzrIaxfAa8reWU
> > sblSEnH1gHvmIeIzKbb5diqIvwAPNjPMt0FcCJLWUiqjTz1KUHKj/hbAR3AUYxaO
> > PZavruFgQm6jTkuZkWRHW0+5/TytTnR4Ca/KBALQcLcolwMkhYZ5hFIeMW8qWGtR
> > JZHMLEW4doQ66gnWBSaTOSv5LhGOEjp2xQEGoAgO5m8IVfpfwO7Vk6XLa2xjnTN8
> > Z2fUQKIJNxjHgbjOCYZmSnVfpf3egEGmHlbKgaxOOcpnVFee/NOZ5aQxy6MpJfN9
> > 3KvH4yfUNgSEB/b97/W/VdNeJl8dTa11Pd36mMQraUAxcrGcOFA=
> > =DaB8
> > -END PGP SIGNATURE-
> >
>


Re: KafkaStreams GroupBy with new key. Can I skip repartition?

2020-02-29 Thread John Roesler
Hi all,

The KIP is accepted and implemented already, but is blocked on code
review: https://github.com/apache/kafka/pull/7170

A quick note on the lack of recent progress... It's completely our fault,
the reviews fell by the wayside during the 2.5.0 release cycle, and we
haven't gotten back to it. The contributor, Levani, has been exceptionally
patient with us and continually kept the PR up-to-date and mergeable
since then.

If you'd like to help get it across the line, Murilo, maybe you can give it
a review?

Thanks,
John

On Sat, Feb 29, 2020, at 20:52, Guozhang Wang wrote:
> It is in progress, but I was not the main reviewer of that ticket so I
> cannot say for sure. I saw the last update is on Jan/2019 so maybe it's a
> bit loose now.. If you want to pick it up and revive the KIP completion
> feel free to do so :)
> 
> 
> Guozhang
> 
> 
> On Fri, Feb 28, 2020 at 5:54 PM Murilo Tavares  wrote:
> 
> > Guozhang
> > The ticket definitely describes what I’m trying to achieve.
> > And should I be hopeful with the fact it’s in progress? :)
> > Thanks for pointing that out.
> > Murilo
> >
> > On Fri, Feb 28, 2020 at 2:57 PM Guozhang Wang  wrote:
> >
> > > Hi Murilo,
> > >
> > > Would this be helping your case?
> > > https://issues.apache.org/jira/browse/KAFKA-4835
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Feb 28, 2020 at 7:01 AM Murilo Tavares 
> > > wrote:
> > >
> > > > Hi
> > > > I am currently doing a simple KTable groupby().aggregate() in
> > > KafkaStreams.
> > > > In the groupBy I do need to select a new key, but I know for sure that
> > > the
> > > > new key would still fall in the same partition. Because of this, I
> > > believe
> > > > the repartition would not be necessary, but my question is: is it
> > > possible
> > > > to do a groupBy, changing the key, and tell KafkaStreams to not create
> > > the
> > > > repartition topic?
> > > > Thanks
> > > > Murilo
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> 
> 
> -- 
> -- Guozhang
>


Re: subscribe kafka user mail

2020-02-27 Thread John Roesler
Hi there! To subscribe to the list, you have to email a different address: 
users-subscr...@kafka.apache.org. (see https://kafka.apache.org/contact.html).

This also applies to the message you sent to dev (should have been 
dev-subscr...@kafka.apache.org).

Thanks for joining the conversation!
-John

On Thu, Feb 27, 2020, at 08:36, Walker Xia wrote:
> subscribe kafka user mail
>


Re: [ANNOUNCE] New committer: Konstantine Karantasis

2020-02-26 Thread John Roesler
Congrats, Konstantine! Awesome news.
-John

On Wed, Feb 26, 2020, at 16:39, Bill Bejeck wrote:
> Congratulations Konstantine! Well deserved.
> 
> -Bill
> 
> On Wed, Feb 26, 2020 at 5:37 PM Jason Gustafson  wrote:
> 
> > The PMC for Apache Kafka has invited Konstantine Karantasis as a committer
> > and we
> > are pleased to announce that he has accepted!
> >
> > Konstantine has contributed 56 patches and helped to review even more. His
> > recent work includes a major overhaul of the Connect task management system
> > in order to support incremental rebalancing. In addition to code
> > contributions, Konstantine helps the community in many other ways including
> > talks at meetups and at Kafka Summit and answering questions on
> > stackoverflow. He consistently shows good judgement in design and a careful
> > attention to details when it comes to code.
> >
> > Thanks for all the contributions and looking forward to more!
> >
> > Jason, on behalf of the Apache Kafka PMC
> >
>


Re: KTable in Compact Topic takes too long to be updated

2020-02-20 Thread John Roesler
Aha! Thanks, Renato, that's very clear.

I think there's a couple of ways you can model this, but one thing that
applies to all of them is that you should consider the `max.task.idle.ms`
configuration option. If you set it higher than `max.poll.interval.ms`,
then Streams will be able to ensure that it processes records from
multiple inputs in timestamp order (as long as the upstream systems
have already buffered the data). It's not strictly necessary for the algorithms
I'd propose, but It's probably more intuitive e.g., if you process the "order"
record before the "item" records for that order.

Offhand, it seems like you could take two high-level approaches here.

One is to collect the items into an "order items" table, keyed by order
id, and _then_ join it with the order table. This is probably more attractive
if you also have other uses for the order table that don't involve the items.
It looks like this:

KStream items = ...
KTable orders = ...

KTable> orderItems = 
  items
.groupBy( item -> new KeyValue(item.orderID, item))
.aggregate(
  () -> new HashMap(),
  (orderId, item, items) -> { items.put(item.itemID, item); return items; }
)

KTable result = 
  orders.join(
orderItems,
(order, itemsForOrder) -> new OrderWithItems(order, itemsForOrder)
  );

Note that this is just a sketch. In reality, you'll have to specify a serde for
the HashMap, etc.


The other idea I had was to treat it as a pure aggregation, by first merging
orders and items into a single stream and just doing the stream aggregation
on it. Something like:

KStream items = ...
KStream orders = ...

KStream itemsByOrderId =
  items.selectKey( (itemId, item) -> item.orderID );

KStream merged = orders.merge(itemsByOrderId);

KTable result =
  merged
.groupByKey()
.aggregate(
  () -> new OrderWithItems(),
  (orderId, object, orderWithItems) -> {
if (object instanceOf Order) {
  orderWithItems.setOrder((Order) object);
} else if (object instanceOf Item) {
  orderWithItems.addItem((Item) object);
} else {
  throw new IllegalArgumentException("Unexpected value: " + object);
}
  }
);

I think the most important consideration between these is just which
one you and your team find more intuitive. They should have about the
same performance characteristics except:
* The first option needs both input KTables to be stored (to compute the join)
* The second option stores just one KTable, consisting of the _result_
The sizes of these two data sets should actually be about the same
*except* that if you _also_ choose to materialize the _result_ of the join
then the first approach would use twice the storage.

But I'd really strongly recommend to favor clear code over efficient code,
unless you know ahead of time that storage efficiency is going to be a
big problem.

I hope this helps!
-John


On Wed, Feb 19, 2020, at 16:19, Renato Melo wrote:
>  Hi John,
> Thank you for your reply.
> 
> Let me clarify.
> 
> I used the word aggregate, but we are using aggregate functions. Our 
> case is a relationship whole-part between messageA and message1, 2, n. 
> Like order and order items.
> 
> So translating our case, messageA is the order and message1 and 2 are items.
> 
> When I said we aggregate, I was trying to say we add item to the order. 
> 
> So we have an order in the KTable.
> 
> When the first item arrives, Kafka Streams joins the item to order.
> 
> Then we add the item to the order. Do some calculations. And them we 
> have a separated Kafka producer that pushes the order back to the 
> KTable.
> After the first item we expected this:
> Order (item1)
> Then the second item arrives and the Kafka Streams joins the item2 to 
> Order in the streams, but the order is not updated yet. So we add the 
> item2 to order and instead of having:
> Order(item1, item2) 
> 
> we have 
> 
> Order(item2)
> I hope I made more clear our scenario.
> Regards,
> 
> Renato de Melo
> 
>Em quarta-feira, 19 de fevereiro de 2020 18:12:15 BRT, John Roesler 
>  escreveu:  
>  
>  Hi Renato,
> 
> Can you describe a little more about the nature of the join+aggregation
> logic? It sounds a little like the KTable represents the result of aggregating
> messages from the KStream?
> 
> If that's the case, the operation you probably wanted was like:
> 
> > KStream.groupBy().aggregate()
> 
> which produces a KTable view of the aggregation result, and also guarantees
> that when processing the second message, you'll see the result of having 
> processed the first.
> 
> Let me know if I've misunderstood.
> 
> Thanks,
> -John
> 
> On Wed, Feb 19, 2020, at 14:03, Renato Melo wrote:
> > Hi Kafka Community,
> > 
> > Please ta

Re: KTable in Compact Topic takes too long to be updated

2020-02-19 Thread John Roesler
Hi Renato,

Can you describe a little more about the nature of the join+aggregation
logic? It sounds a little like the KTable represents the result of aggregating
messages from the KStream?

If that's the case, the operation you probably wanted was like:

> KStream.groupBy().aggregate()

which produces a KTable view of the aggregation result, and also guarantees
that when processing the second message, you'll see the result of having 
processed the first.

Let me know if I've misunderstood.

Thanks,
-John

On Wed, Feb 19, 2020, at 14:03, Renato Melo wrote:
> Hi Kafka Community,
> 
> Please take a look into my use case:
> 
> Fist message1
> 1. We have a KStream joined to a KTable(Compact Topic).
> 2. We received a message1 from the KStream, aggregates the message1 to 
> the joined messageA from KTable. 
> 3. We pushes back the messageA with aggregated message1 into KTable.
> 
> Second message 2
> 4. Message2 arrives on KStream and joins to the expected update 
> MessageA from the KTable. For our surprise messageA was not yet updated.
> 5. We aggregate message2 into messageA.
> 6. We pushes messageA to the KTable(Compact topic) and the first 
> aggregated message is overwritten.
> 
> Is there a way to speed up the update in the Ktable (Compact Topic)?
> 
> Is something wrong with my use case?
> 
> I do appreciate any help. Thank you in advanced.
> 
> Renato de Melo
> 
> 
>


Re: Using Kafka AdminUtils

2020-02-16 Thread John Roesler
Hi Victoria,

Sorry for the vagueness, I’m not in front of a computer right now, so I can 
only answer from memory. 

I’m not sure why that interface is still tagged “evolving”. Any changes to it 
would go through a deprecation period, just like any public interface in Kafka. 
We should probably remove that annotation. 

Your questions are good ones. It seems like we should update the JavaDoc to 
clarify. My observation is that you do get the exception if the topic already 
exists, and I doubt that this API would change any configuration for 
already-present topics. In my usage, I’ve just caught that exception and 
ignored it. 

Thanks,
John

On Sun, Feb 16, 2020, at 13:12, Victoria Zuberman wrote:
> Hi, John
> 
> Thanks a lot for valuable information.
> I looked at KafkaAdminClient and I see that it offers createTopics 
> method that indeed seems suitable.
> 
> I still have a couple of questions:
> 
> 1. In the documentation it is not mentioned what is the expected 
> behavior if the specified topic already exists.
>  Will it fail?
>  Will it throw TopicExistsException exception?
> If topic existed before createTopics was called will it remain 
> unchanged?
> The behavior is not easily deduced  from KafkaAdminClient code 
> alone, I did try
> 
> 2. I see that AdminClient is supported for a while now but API is still 
> marked as Evolving.
> From version notes it seems that its basic functionality (like 
> createTopics) remains pretty stable.
> Is it considered stable enough for production?
> 
> Thanks,
> Victoria
> 
> 
> On 16/02/2020, 20:15, "John Roesler"  wrote:
> 
> Hi Victoria,
> 
> I’ve used the AdminClient for this kind of thing before. It’s the 
> official java client for administrative actions like creating topics. 
> You can create topics with any partition count, replication, or any 
> other config.
> 
> I hope this helps,
> John
> 
> On Sat, Feb 15, 2020, at 22:41, Victoria Zuberman wrote:
> > Hi,
> >
> > I have an application based on Kafka Streams.
> > It reads from Kafka topic (I call this topic “input topic”).
> > That topic has many partitions and their number varies based on 
> the env
> > in which application is running.
> > I don’t want to create different input topics manually.
> > Configuration of auto.create.topics.enable and num.partitions is 
> not
> > enough for me.
> > The solution I am looking to implement is to check during 
> application
> > init whether the input topic exists and if not to create it with
> > relevant partition number and replication factor.
> >
> > I found the following example that uses kafka.admin.AdminUtils 
> and it
> > seems to be suitable:
> > 
> https://www.codota.com/code/java/methods/kafka.admin.AdminUtils/createTopic
> >
> > Please advise whether using AdminUtils is considered a good 
> practice.
> > Is AdminUtils functionality considered stable and reliable?
> > If there are other solutions, I would appreciate to hear about 
> them.
> >
> > Thanks,
> > Victoria
> >
> > ---
> > NOTICE:
> > This email and all attachments are confidential, may be 
> proprietary,
> > and may be privileged or otherwise protected from disclosure. 
> They are
> > intended solely for the individual or entity to whom the email is
> > addressed. However, mistakes sometimes happen in addressing 
> emails. If
> > you believe that you are not an intended recipient, please stop 
> reading
> > immediately. Do not copy, forward, or rely on the contents in any 
> way.
> > Notify the sender and/or Imperva, Inc. by telephone at +1 (650)
> > 832-6006 and then delete or destroy any copy of this email and its
> > attachments. The sender reserves and asserts all rights to
> > confidentiality, as well as any privileges that may apply. Any
> > disclosure, copying, distribution or action taken or omitted to be
> > taken by an unintended recipient in reliance on this message is
> > prohibited and may be unlawful.
> > Please consider the environment before printing this email.
> >
> 
> 
> ---
> NOTICE:
> This email and all attachments are confidential, may be proprietary, 
> and may be privileged or otherwise protected from disclosure. They are 
> intended solely for the individual or entity to whom the email is 
> addressed. However, mistakes sometimes happen in

Re: Using Kafka AdminUtils

2020-02-16 Thread John Roesler
Hi Victoria,

I’ve used the AdminClient for this kind of thing before. It’s the official java 
client for administrative actions like creating topics. You can create topics 
with any partition count, replication, or any other config. 

I hope this helps,
John

On Sat, Feb 15, 2020, at 22:41, Victoria Zuberman wrote:
> Hi,
> 
> I have an application based on Kafka Streams.
> It reads from Kafka topic (I call this topic “input topic”).
> That topic has many partitions and their number varies based on the env 
> in which application is running.
> I don’t want to create different input topics manually.
> Configuration of auto.create.topics.enable and num.partitions is not 
> enough for me.
> The solution I am looking to implement is to check during application 
> init whether the input topic exists and if not to create it with 
> relevant partition number and replication factor.
> 
> I found the following example that uses kafka.admin.AdminUtils and it 
> seems to be suitable:
> https://www.codota.com/code/java/methods/kafka.admin.AdminUtils/createTopic
> 
> Please advise whether using AdminUtils is considered a good practice.
> Is AdminUtils functionality considered stable and reliable?
> If there are other solutions, I would appreciate to hear about them.
> 
> Thanks,
> Victoria
> 
> ---
> NOTICE:
> This email and all attachments are confidential, may be proprietary, 
> and may be privileged or otherwise protected from disclosure. They are 
> intended solely for the individual or entity to whom the email is 
> addressed. However, mistakes sometimes happen in addressing emails. If 
> you believe that you are not an intended recipient, please stop reading 
> immediately. Do not copy, forward, or rely on the contents in any way. 
> Notify the sender and/or Imperva, Inc. by telephone at +1 (650) 
> 832-6006 and then delete or destroy any copy of this email and its 
> attachments. The sender reserves and asserts all rights to 
> confidentiality, as well as any privileges that may apply. Any 
> disclosure, copying, distribution or action taken or omitted to be 
> taken by an unintended recipient in reliance on this message is 
> prohibited and may be unlawful.
> Please consider the environment before printing this email.
>


Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-11 Thread John Roesler
y happen. I expect that a window, in order to be written to the
> changelog topic, first needs to go through "put"; so even if it's mixed on
> the input side, it should be skipped if expired at the moment of "put"
> (relatively to observedStreamTime) and on restoration everything should be
> fine.
> 
> As the next step, I would like to list/inspect records and their timestamps
> from given partition of the changelog topic via a command line tool (or in
> some other way) - to confirm if they are really stored this way. If you
> have a tip on how to do it, please let me know.
> 
> That is all I have for now. I would like to resolve it. I will post it here
> if I come up with something new.
> 
> Thank you
> Jiri
> 
> 
> 
> On Mon, Feb 10, 2020 at 10:14 PM John Roesler  wrote:
> >
> > Hey all,
> >
> > Sorry for the confusion. Bruno set me straight offline.
> >
> > Previously, we had metrics for each reason for skipping records, and the
> > rationale was that you would monitor the metrics and only turn to the logs
> > if you needed to *debug* unexpected record skipping. Note that skipping
> > records by itself isn't a cause for concern, since this is exactly what
> Streams
> > is designed to do in a number of situations.
> >
> > However, during the KIP-444 discussion, the decision was reversed, and we
> > decided to just log one "roll-up" metric for all skips and increase the
> log
> > messages to warning level for debuggability. This particularly makes sense
> > because you otherwise would have to restart the application to change the
> > log level if you needed to figure out why the single skipped-record metric
> > is non-zero. And then you may not even observe it again.
> >
> > I either missed the memo on that discussion, or participated in it and
> then
> > forgot it even happened. I'm not sure I want to look back at the thread to
> > find out.
> >
> > Anyway, I've closed the PR I opened to move it back to debug. We should
> > still try to help figure out the root cause of this particular email
> thread,
> > though.
> >
> > Thanks,
> > -John
> >
> > On Mon, Feb 10, 2020, at 12:20, Sophie Blee-Goldman wrote:
> > > While I agree that seems like it was probably a refactoring mistake, I'm
> > > not
> > > convinced it isn't the right thing to do. John, can you reiterate the
> > > argument
> > > for setting it to debug way back when?
> > >
> > > I would actually present this exact situation as an argument for
> keeping it
> > > as
> > > warn, since something indeed seems fishy here that was only surfaced
> > > through this warning. That said, maybe the metric is the more
> appropriate
> > > way to bring attention to this: not sure if it's info or debug level
> > > though, or
> > > how likely it is that anyone really pays attention to it?
> > >
> > > On Mon, Feb 10, 2020 at 9:53 AM John Roesler  wrote:
> > >
> > > > Hi,
> > > >
> > > > I’m sorry for the trouble. It looks like it was a mistake during
> > > >
> > > > https://github.com/apache/kafka/pull/6521
> > > >
> > > > Specifically, while addressing code review comments to change a bunch
> of
> > > > other logs from debugs to warnings, that one seems to have been
> included by
> > > > accident:
> > > >
> https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c
> > > >
> > > > I’ll see if I can fix it today.
> > > >
> > > > Regarding Bruno's thoughts, there was a pretty old decision to
> capture the
> > > > "skipped records" as a metric for visibility and log it at the debug
> level
> > > > for debuggability. We decided that "warning" wasn't the right level
> because
> > > > Streams is operating completely as specified.
> > > >
> > > > However, I do agree that it doesn't seem right to see more skipped
> records
> > > > during start-up; I would expect to see exactly the same records
> skipped
> > > > during start-up as during regular processing, since the skipping
> logic is
> > > > completely deterministic and based on the sequence of timestamps your
> > > > records have in the topic.  Maybe you just notice it more during
> startup?
> > > > I.e., if there are 1000 warning logs spread over a few months, then
> you
> > > > don't notice it, but when you see them all toget

Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-10 Thread John Roesler
Hey all,

Sorry for the confusion. Bruno set me straight offline.

Previously, we had metrics for each reason for skipping records, and the
rationale was that you would monitor the metrics and only turn to the logs
if you needed to *debug* unexpected record skipping. Note that skipping
records by itself isn't a cause for concern, since this is exactly what Streams
is designed to do in a number of situations.

However, during the KIP-444 discussion, the decision was reversed, and we
decided to just log one "roll-up" metric for all skips and increase the log
messages to warning level for debuggability. This particularly makes sense
because you otherwise would have to restart the application to change the
log level if you needed to figure out why the single skipped-record metric
is non-zero. And then you may not even observe it again.

I either missed the memo on that discussion, or participated in it and then
forgot it even happened. I'm not sure I want to look back at the thread to
find out.

Anyway, I've closed the PR I opened to move it back to debug. We should
still try to help figure out the root cause of this particular email thread,
though.

Thanks,
-John

On Mon, Feb 10, 2020, at 12:20, Sophie Blee-Goldman wrote:
> While I agree that seems like it was probably a refactoring mistake, I'm
> not
> convinced it isn't the right thing to do. John, can you reiterate the
> argument
> for setting it to debug way back when?
> 
> I would actually present this exact situation as an argument for keeping it
> as
> warn, since something indeed seems fishy here that was only surfaced
> through this warning. That said, maybe the metric is the more appropriate
> way to bring attention to this: not sure if it's info or debug level
> though, or
> how likely it is that anyone really pays attention to it?
> 
> On Mon, Feb 10, 2020 at 9:53 AM John Roesler  wrote:
> 
> > Hi,
> >
> > I’m sorry for the trouble. It looks like it was a mistake during
> >
> > https://github.com/apache/kafka/pull/6521
> >
> > Specifically, while addressing code review comments to change a bunch of
> > other logs from debugs to warnings, that one seems to have been included by
> > accident:
> > https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c
> >
> > I’ll see if I can fix it today.
> >
> > Regarding Bruno's thoughts, there was a pretty old decision to capture the
> > "skipped records" as a metric for visibility and log it at the debug level
> > for debuggability. We decided that "warning" wasn't the right level because
> > Streams is operating completely as specified.
> >
> > However, I do agree that it doesn't seem right to see more skipped records
> > during start-up; I would expect to see exactly the same records skipped
> > during start-up as during regular processing, since the skipping logic is
> > completely deterministic and based on the sequence of timestamps your
> > records have in the topic.  Maybe you just notice it more during startup?
> > I.e., if there are 1000 warning logs spread over a few months, then you
> > don't notice it, but when you see them all together at start-up, it's more
> > concerning?
> >
> > Thanks,
> > -John
> >
> >
> > On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote:
> > > Hi,
> > >
> > > I am pretty sure this was intentional. All skipped records log
> > > messages are on WARN level.
> > >
> > > If a lot of your records are skipped on app restart with this log
> > > message on WARN-level, they were also skipped with the log message on
> > > DEBUG-level. You simply did not know about it before. With an
> > > in-memory window store, this message is logged when a window with a
> > > start time older than the current stream time minus the retention
> > > period is put into the window store, i.e., the window is NOT inserted
> > > into the window stroe. If you get a lot of them on app restart, you
> > > should have a look at the timestamps of your records and the retention
> > > of your window store. If those values do not explain the behavior,
> > > please try to find a minimal example that shows the issue and post it
> > > here on the mailing list.
> > >
> > > On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří 
> > wrote:
> > > >
> > > > Hi,
> > > >
> > > > in
> > > >
> > https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
> > > >
> > > > log level of "Skipping record for expired segment" was changed from
> > debug
> > > > to warn. Was it intentional change? Should it be somehow handled by
> > user?
> > > > How can user handle it? I am getting a lot of these on app restart.
> > >
> >
>


Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-10 Thread John Roesler
Hi,

I’m sorry for the trouble. It looks like it was a mistake during

https://github.com/apache/kafka/pull/6521

Specifically, while addressing code review comments to change a bunch of other 
logs from debugs to warnings, that one seems to have been included by accident: 
https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c

I’ll see if I can fix it today.

Regarding Bruno's thoughts, there was a pretty old decision to capture the 
"skipped records" as a metric for visibility and log it at the debug level for 
debuggability. We decided that "warning" wasn't the right level because Streams 
is operating completely as specified.

However, I do agree that it doesn't seem right to see more skipped records 
during start-up; I would expect to see exactly the same records skipped during 
start-up as during regular processing, since the skipping logic is completely 
deterministic and based on the sequence of timestamps your records have in the 
topic.  Maybe you just notice it more during startup? I.e., if there are 1000 
warning logs spread over a few months, then you don't notice it, but when you 
see them all together at start-up, it's more concerning?

Thanks,
-John


On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote:
> Hi,
> 
> I am pretty sure this was intentional. All skipped records log
> messages are on WARN level.
> 
> If a lot of your records are skipped on app restart with this log
> message on WARN-level, they were also skipped with the log message on
> DEBUG-level. You simply did not know about it before. With an
> in-memory window store, this message is logged when a window with a
> start time older than the current stream time minus the retention
> period is put into the window store, i.e., the window is NOT inserted
> into the window stroe. If you get a lot of them on app restart, you
> should have a look at the timestamps of your records and the retention
> of your window store. If those values do not explain the behavior,
> please try to find a minimal example that shows the issue and post it
> here on the mailing list.
> 
> On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří  wrote:
> >
> > Hi,
> >
> > in
> > https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
> >
> > log level of "Skipping record for expired segment" was changed from debug
> > to warn. Was it intentional change? Should it be somehow handled by user?
> > How can user handle it? I am getting a lot of these on app restart.
>


Re: Resource based kafka assignor

2020-01-31 Thread John Roesler
Hi Srinivas,

Your approach sounds fine, as long as you don’t need the view of the assignment 
to be strictly consistent. As a roughy approximation, it could work. 

On the other hand, if you’re writing a custom assignor, you could consider 
using the SubscriptionInfo field of the joinGroup request to encode arbitrary 
information from the members to the leader, which it can use in making 
decisions. So you could encode a custom “node id” there and not have to rely on 
patterns in the group id. Or you could even just directly encode node load 
information and use that to influence the assignment. 

Iirc, there’s no explicit “trigger rebalance” command, but you can still make 
it happen by doing something like unsubscribing and resubscribing again. 

I hope this helps!
John

On Thu, Jan 30, 2020, at 09:25, Devaki, Srinivas wrote:
> Also, want to clarify one more doubt,
> 
> is there any way for the client to explicitly trigger a rebalance without
> dying itself?
> 
> On Thu, Jan 30, 2020 at 7:54 PM Devaki, Srinivas 
> wrote:
> 
> > Hi All,
> >
> > We have a set of logstash consumer groups running under the same set of
> > instances, we have decided to run separate consumer groups subscribing
> > multiple topics instead of running single consumer group for all topics(the
> > reasoning behind this decision is because of how our elasticsearch cluster
> > is designed).
> >
> > Since we are running multiple consumer groups, sometimes we have detected
> > that a few ec2 nodes are receiving multiple high throughput topics in
> > different consumer groups. which was expected based on the implementation
> > of round robin assignor.
> >
> > So I've decided to make a partition assignor which will consider the
> > assignment based on other consumer group assignment.
> >
> > Could you please give me some pointers on how to proceed. This is my
> > initial ideas on the problem.
> >
> > Solution #0:
> > write an assignor, and use a specific consumer id pattern across all
> > consumer groups, and in the assignor do a describe on all consumer groups
> > and based on the topic throughput and the other consumer group assignment
> > decide the assignment of this topic
> >
> >
> >
> >
> > Thanks
> >
>


Re: stop

2020-01-22 Thread John Roesler
Hey Sowjanya,

That won't work. The "welcome" email you got when you signed up for the mailing 
list has instructions for unsubscribing:

> To remove your address from the list, send a message to:
>  

Cheers,
-John

On Wed, Jan 22, 2020, at 10:12, Sowjanya Karangula wrote:
> stop
>


Re: Does Merging two kafka-streams preserve co-partitioning

2020-01-20 Thread John Roesler
Hi Yair,

You should be fine! 

Merging does preserve copartitioning.

Also processing on that partition is single-threaded, so you don’t have to 
worry about races on the same key in your transformer.

Actually, you might want to use transformValues to inform Streams that you 
haven’t changed the key. Otherwise, it would need to repartition the result 
before you could do further stateful processing. 

I hope this helps!

Thanks,
John

On Mon, Jan 20, 2020, at 05:27, Yair Halberstadt wrote:
> Hi
> I asked this question on stack-overflow and was wondering if anyone here
> could answer it:
> https://stackoverflow.com/questions/59820243/does-merging-two-kafka-streams-preserve-co-partitioning
> 
> 
> I have 2 co-partitioned kafka topics. One contains automatically generated
> data, and the other manual overrides.
> 
> I want to merge them and filter out any automatically generated data that
> has already been manually overidden, and then forward everything to a
> combined ChangeLog topic.
> 
> To do so I create a stream from each topic, and [merge the streams](
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream-)
> using the dsl API.
> 
> I then apply the following transform, which stores any manual data, and
> deletes any automatic data which has already been manually overidden:
> (Scala but should be pretty easy to understand if you know java)
> 
> ```scala
> class FilterManuallyClassifiedTransformer(manualOverridesStoreName : String)
>   extends Transformer[Long, Data, KeyValue[Long, Data]] {
> 
>   // Array[Byte] used as dummy value since we don't use the value
>   var store: KeyValueStore[Long, Array[Byte]] = _
> 
>   override def init(context: ProcessorContext): Unit = {
> store = context.getStateStore(manualOverridesStoreName
> ).asInstanceOf[KeyValueStore[Long, Array[Byte]]]
>   }
> 
>   override def close(): Unit = {}
> 
>   override def transform(key: Long, value: Data): KeyValue[Long, Data] = {
> if (value.getIsManual) {
>   store.put(key, Array.emptyByteArray)
>   new KeyValue(key, value)
> }
> else if (store.get(key) == null) {
>   new KeyValue(key, value)
> }
> else {
>   null
> }
>   }
> }
> ```
> 
> If I understand correctly, there is no guarantee this will work unless
> manual data and automatic data with the same key are in the same partition.
> Otherwise the manual override might be stored in a different state store to
> the one that the automatic data checks.
> 
> And even if they are stored in the same StateStore there might be race
> conditions, where an automatic data checks the state store, then the manual
> override is added to the state store, then the manual override is written
> to the output topic, then the automatic data is written to the output
> topic, leading to the automatic data overwriting the manual override.
> 
> Is that correct?
> 
> And if so will `merge` preserve the co-partitioning guarantee I need?
> 
> Thanks for your help
>


Re: KTable Suppress not working

2020-01-19 Thread John Roesler
Hi Sushrut,

I have to confess I don’t think I fully understand your last message, but I 
will try to help.

It sounds like maybe you’re thinking that streams would just repeatedly emit 
everything every commit? That is certainly not the case. If there are only 10 
events in window 1 and 10 in window 2, you would see at most 20 output events, 
regardless of any caching or suppression. That is, if you disable all caches, 
you get one output record ( an updated aggregation result) for each input 
record. Enabling caches only serves to reduce the number. 

I hope this helps,
John


On Sat, Jan 18, 2020, at 08:36, Sushrut Shivaswamy wrote:
> Hey John,
> 
> I tried following the docs here about the configs:
> `streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 10 * 1024 * 1024L);
> // Set commit interval to 1 second.
> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);`
> https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt
> 
> I'm trying to group events by id by accumulating them in a list and then
> spilt the aggregated list
> into smaller chunks for processing.
> I have a doubt about when windows expire and how aggregated values are
> flushed out.
> Lets assume in window 1(W1) 10 records arrived and in window 2(W2) 10 more
> records arrived for the same key.
> Assuming the cache can hold only 10 records in memory.
> Based on my understanding:
> At T1: 10 records from W1 are flushed
> At T2: 20 records from W1 + W2 are flushed.
> The records from W1 will be duplicated at the next commit time till that
> window expires.
> Is this accurate?
> If it is, can you share any way I can avoid/limit the number of times
> duplicate data is flushed?
> 
> Thanks,
> Sushrut
> 
> 
> 
> 
> 
> 
> On Sat, Jan 18, 2020 at 12:00 PM Sushrut Shivaswamy <
> sushrut.shivasw...@gmail.com> wrote:
> 
> > Thanks John,
> > I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> >
> > Thanks,
> > Sushrut
> >
> > On Sat, Jan 18, 2020 at 11:31 AM John Roesler  wrote:
> >
> >> Ah, I should add, if you actually want to use suppression, or
> >> you need to resolve a similar error message in the future, you
> >> probably need to tweak the batch sizes and/or timeout configs
> >> of the various clients, and maybe the server as well.
> >>
> >> That error message kind of sounds like the server went silent
> >> long enough that the http session expired, or maybe it suffered
> >> a long pause of some kind (GC, de-scheduling, etc.) that caused
> >> the OS to hang up the socket.
> >>
> >> I'm not super familiar with diagnosing these issues; I'm just
> >> trying to point you in the right direction in case you wanted
> >> to directly solve the given error instead of trying something
> >> different.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> >> > Hi Sushrut,
> >> >
> >> > That's frustrating... I haven't seen that before, but looking at the
> >> error
> >> > in combination with what you say happens without suppress makes
> >> > me think there's a large volume of data involved here. Probably,
> >> > the problem isn't specific to suppression, but it's just that the
> >> > interactions on the suppression buffers are pushing the system over
> >> > the edge.
> >> >
> >> > Counterintuitively, adding Suppression can actually increase your
> >> > broker traffic because the Suppression buffer has to provide resiliency
> >> > guarantees, so it needs its own changelog, even though the aggregation
> >> > immediately before it _also_ has a changelog.
> >> >
> >> > Judging from your description, you were just trying to batch more,
> >> rather
> >> > than specifically trying to get "final results" semantics for the window
> >> > results. In that case, you might want to try removing the suppression
> >> > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> >> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> >> >
> >> > Hope this helps,
> >> > -John
> >> >
> >> > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> >> > > Hey,
> >> > >
> >> > > I'm building a streams application where I'm trying to aggregate a
> >> stream
> >> 

Re: KTable Suppress not working

2020-01-17 Thread John Roesler
Ah, I should add, if you actually want to use suppression, or
you need to resolve a similar error message in the future, you
probably need to tweak the batch sizes and/or timeout configs
of the various clients, and maybe the server as well.

That error message kind of sounds like the server went silent
long enough that the http session expired, or maybe it suffered
a long pause of some kind (GC, de-scheduling, etc.) that caused
the OS to hang up the socket.

I'm not super familiar with diagnosing these issues; I'm just 
trying to point you in the right direction in case you wanted
to directly solve the given error instead of trying something 
different.

Thanks,
-John

On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> Hi Sushrut,
> 
> That's frustrating... I haven't seen that before, but looking at the error
> in combination with what you say happens without suppress makes
> me think there's a large volume of data involved here. Probably,
> the problem isn't specific to suppression, but it's just that the
> interactions on the suppression buffers are pushing the system over
> the edge.
> 
> Counterintuitively, adding Suppression can actually increase your
> broker traffic because the Suppression buffer has to provide resiliency
> guarantees, so it needs its own changelog, even though the aggregation
> immediately before it _also_ has a changelog.
> 
> Judging from your description, you were just trying to batch more, rather
> than specifically trying to get "final results" semantics for the window
> results. In that case, you might want to try removing the suppression
> and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> 
> Hope this helps,
> -John
> 
> On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> > Hey,
> > 
> > I'm building a streams application where I'm trying to aggregate a stream
> > of events
> > and getting a list of events per key.
> > `eventStream
> > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> > .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> > .aggregate(
> > ArrayList::new, (eent, accum) -> {
> > accum.add(event);
> > return accum;
> > })
> > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> > .toStream()
> > .map((windowedKey, value) -> new KeyValue > List>(windowedKey.key(), value))
> > .map(eventProcessor::processEventsWindow)
> > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> > eventListSerde))`
> > 
> > As you can see I'm grouping events by key and capturing windowed lists of
> > events for further processing.
> > To be able to process the list of events per key in chunks I added
> > `suppress()`.
> > This does not seem to work though.
> > I get this error multiple times:
> > `Got error produce response with correlation id 5 on topic-partition
> > app-test143-KTABLE-SUPPRESS-STATE-STORE-16-changelog-0, retrying
> > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> > WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid
> > metadata error in produce request on partition
> > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-16-changelog-0 due to
> > org.apache.kafka.common.errors.NetworkException: The server disconnected
> > before a response was received.. Going to request metadata update now`
> > 
> > When I comment out the suppress() line it works fine but I get a large
> > number of events in a list while processing chunks since it does not
> > suppress already evaluated chunks.
> > Can anyone help me out with what could be happening here?
> > 
> > Regards,
> > Sushrut
> >
>


Re: KTable Suppress not working

2020-01-17 Thread John Roesler
Hi Sushrut,

That's frustrating... I haven't seen that before, but looking at the error
in combination with what you say happens without suppress makes
me think there's a large volume of data involved here. Probably,
the problem isn't specific to suppression, but it's just that the
interactions on the suppression buffers are pushing the system over
the edge.

Counterintuitively, adding Suppression can actually increase your
broker traffic because the Suppression buffer has to provide resiliency
guarantees, so it needs its own changelog, even though the aggregation
immediately before it _also_ has a changelog.

Judging from your description, you were just trying to batch more, rather
than specifically trying to get "final results" semantics for the window
results. In that case, you might want to try removing the suppression
and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
and "COMMIT_INTERVAL_MS_CONFIG" configurations.

Hope this helps,
-John

On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> Hey,
> 
> I'm building a streams application where I'm trying to aggregate a stream
> of events
> and getting a list of events per key.
> `eventStream
> .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> .aggregate(
> ArrayList::new, (eent, accum) -> {
> accum.add(event);
> return accum;
> })
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> .toStream()
> .map((windowedKey, value) -> new KeyValue List>(windowedKey.key(), value))
> .map(eventProcessor::processEventsWindow)
> .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> eventListSerde))`
> 
> As you can see I'm grouping events by key and capturing windowed lists of
> events for further processing.
> To be able to process the list of events per key in chunks I added
> `suppress()`.
> This does not seem to work though.
> I get this error multiple times:
> `Got error produce response with correlation id 5 on topic-partition
> app-test143-KTABLE-SUPPRESS-STATE-STORE-16-changelog-0, retrying
> (2147483646 attempts left). Error: NETWORK_EXCEPTION
> WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid
> metadata error in produce request on partition
> shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-16-changelog-0 due to
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.. Going to request metadata update now`
> 
> When I comment out the suppress() line it works fine but I get a large
> number of events in a list while processing chunks since it does not
> suppress already evaluated chunks.
> Can anyone help me out with what could be happening here?
> 
> Regards,
> Sushrut
>


Re: [ANNOUNCE] New Kafka PMC Members: Colin, Vahid and Manikumar

2020-01-14 Thread John Roesler
Congrats, Colin, Vahid, and Manikumar! A great accomplishment, reflecting your 
great work.

-John

On Tue, Jan 14, 2020, at 11:33, Bill Bejeck wrote:
> Congrats Colin, Vahid and Manikumar! Well deserved.
> 
> -Bill
> 
> On Tue, Jan 14, 2020 at 12:30 PM Gwen Shapira  wrote:
> 
> > Hi everyone,
> >
> > I'm happy to announce that Colin McCabe, Vahid Hashemian and Manikumar
> > Reddy are now members of Apache Kafka PMC.
> >
> > Colin and Manikumar became committers on Sept 2018 and Vahid on Jan
> > 2019. They all contributed many patches, code reviews and participated
> > in many KIP discussions. We appreciate their contributions and are
> > looking forward to many more to come.
> >
> > Congrats Colin, Vahid and Manikumar!
> >
> > Gwen, on behalf of Apache Kafka PMC
> >
>


Re: designing a streaming task for count and event time difference

2020-01-05 Thread John Roesler
Hey Chris,

Yeah, I think what you’re really looking for is data-driven windowing, which we 
haven’t implemented yet. In lieu of that, you’ll want to build on top of 
session windows. 

What you can do is define an aggregate object similar to what Sachin proposed. 
After the aggregation, you can just filter to only allow results where “open == 
false”. Since you have explicit session end events, I don’t think you need 
suppression. 

Hope this helps,
John 

On Sun, Jan 5, 2020, at 06:36, Chris Madge wrote:
> This is great, thank you very much.  I've read more into session 
> windowing and suppression and they seem to fit my needs perfectly.  I'm 
> struggling to find a method of triggering the window to close early 
> when I receive the end event.
> 
> Maybe I could assign a monotonically increasing identifier each time I 
> see a start event, then re-key including that as part of a compound key 
> and session window by that?  I feel like I may be engineering an 
> anti-pattern where there's something much better already built in.
> 
> On 2020/01/04 17:46:40, Sachin Mittal  wrote: 
> > Try something like this:
> > 
> > stream
> >   .groupBy(
> > (key, value) -> value.userId
> >   )
> >   .aggregate(
> > () -> new Session(),
> > (aggKey, newValue, aggValue) -> {
> >   aggValue.userId = newValue.userId
> >   if (newValue.start) {
> > aggValue.start = newValue.start
> > aggValue.duration = 0
> > aggValue.open = true
> >   }
> >   else if (newValue.end) {
> > aggValue.duration = newValue.end - aggValue.start
> > aggValue.close = true
> >   } else {
> > aggValue.count++
> > aggValue.duration = now() - aggValue.start
> >   }
> > }
> >   )
> > 
> > Note you need to have a well defined Session class and Event class with
> > their appropriate serde.
> > So the aggregated stream would have Session with its attributes like start
> > time, duration, count, session open or close.
> > 
> > One thing you need to take care is after a session is closed a new session
> > for the same user can be created again.
> > So you may need to break sessions based on some windowing or as session is
> > closed you store it in some store (be it internal to kafka or some external
> > database) and reset the session object.
> > 
> > Hope this helps.
> > 
> > 
> > 
> > 
> > On Sat, Jan 4, 2020 at 10:02 PM Chris Madge  wrote:
> > 
> > > Hi there,
> > >
> > > It’s my first voyage into stream processing - I’ve tried a few things but
> > > I think I’m struggling to think in the streams way. I wondered if I could
> > > be cheeky and ask if someone could give me some clues as to the correct
> > > design for my first task to get me started?
> > >
> > > I have application events coming in like:
> > >
> > > ,type:start,
> > > ,type:action,
> > > ,type:action,
> > > ,type:action,
> > > ,type:end,
> > >
> > > each one represents a single user session.
> > >
> > > I need to output:
> > > , > > event>,,
> > >
> > > I’m working with event time (specified by the application) and I can’t
> > > trust the application to close sessions/notify gracefully (I’m happy for
> > > those to be thrown out, but cool ideas for alternatives are very 
> > > welcome!).
> > >
> > > Any advice would be much appreciated.
> > >
> > > Chris Madge
> > >
> > 
>


Re: Kafka trunk vs master branch

2019-12-25 Thread John Roesler
Hi Sachin,

Trunk is the basis for development. I’m not sure what master is for, if 
anything. I’ve never used it for anything or even checked it out. 

The numbered release branches are used to develop patch releases.

Releases are created from trunk, PRs should be made against trunk, etc. 

Thanks for asking!
John

On Wed, Dec 25, 2019, at 08:54, Sachin Mittal wrote:
> Hello Folks,
> I just wanted to know what commits goes into what branch.
> 
> I see trunk branch which seems default and latest.
> I also see master branch which seems bit behind trunk.
> I also see different versions branches like 2.2, 2.3 and 2.4 which are also
> actively updated.
> 
> I wanted to know when forking kafka repo, which is the branch one should
> use base off to build from source or do any active development.
> 
> What is the difference between between trunk and master branch?
> Also release branches are created from trunk or master branch?
> 
> Also when issuing a pull request which is the general branch one should use
> as target?
> 
> Thanks
> Sachin
>


  1   2   >