Re: Streams/RocksDB: Why Universal Compaction?

2023-07-27 Thread Guozhang Wang
Thanks Colt!

For a library's default configs, I think the principle would be "it
runs appropriately out of the box for the first time you played with
it", so I'm not suggesting we should try to make sure it is a
generally good combination for a wide range of production usage since
in that case most people would prefer to have some customizations
rather than blindly accept those default config values anyways. Hence,
what I had in mind regarding "benchmarks" is something light, like
running those stateful examples in our tutorials
(https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples),
and see if the new config over all gives a better performance. It's
not required to be a very comprehensive one. If you could help with
that validation it would be great.


Guozhang

On Wed, Jul 26, 2023 at 8:38 AM Colt McNealy  wrote:
>
> Guozhang,
>
> Thanks for your response. That makes a lot of sense; I can't promise any
> super-formal benchmarks but we will definitely play with the configurations
> you sent and report back within a month about our high-level findings.
>
> For our purposes (a workflow engine), we will mostly monitor workflow
> execution metrics + state store restoration times. But in the interest of a
> formal benchmark that could be included in a KIP—what monitoring software
> tooling and setup environment would you recommend? If it doesn't involve
> writing copious amounts of custom code, perhaps (no promises) my team could
> put something together that's more suitable for a general Streams audience
> rather than just our own internal usage.
>
> Cheers,
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Sun, Jul 23, 2023 at 11:21 AM Guozhang Wang 
> wrote:
>
> > Yeah I can shed some light here: I used Universal originally since at
> > the beginning of Kafka Streams journey there were user reports
> > complaining about its storage amplifications. But soon enough (around
> > 2019) I've realized that, as a OOTB config, level compaction may be
> > more preferable.
> >
> > I had a PR dating back to that time where I suggested changing a bunch
> > of OOTB configs or RocksDB including the compaction config:
> > https://github.com/apache/kafka/pull/6406/files, unfortunately it was
> > not merged since I wanted to run some benchmarks to make sure it does
> > not have any gotchas but never got the time to do so. I would be very
> > happy in fact if someone could pick that up and re-examine if they
> > still make sense, and if yes drive it through and merge.
> >
> > Guozhang
> >
> >
> > On Sun, Jul 23, 2023 at 10:29 AM Matthias J. Sax  wrote:
> > >
> > > Do you happen to know?
> > >
> > >
> > >  Forwarded Message 
> > > Subject: Streams/RocksDB: Why Universal Compaction?
> > > Date: Fri, 23 Jun 2023 13:19:36 -0700
> > > From: Colt McNealy 
> > > Reply-To: users@kafka.apache.org
> > > To: users@kafka.apache.org
> > >
> > > Hello there!
> > >
> > > I was wondering if anyone (perhaps an early developer or power-user of
> > > Kafka Streams) knows why the Streams developers made the default setting
> > > for RocksDB compaction "Universal" compaction rather than "Level"
> > > compaction?
> > >
> > > My understanding (in which I am extremely UNconfident) is as follows—
> > >
> > > Supposedly Universal compaction leads to lower write amplification after
> > > compaction finishes. In a run of Universal compaction, all data is
> > > compacted; as per the RocksDB documentation it is possible for temporary
> > > write amplification of up to 2x during this process. There have also been
> > > reports of "write stalls" during this process [1].
> > >
> > > In Level compaction, only certain levels (tiers of SST files) are
> > compacted
> > > at once, meaning that the compaction process is shorter and less
> > intensive,
> > > but that write amplification after compaction finishes is higher than
> > with
> > > universal compaction.
> > >
> > > Can anyone confirm/deny/correct this?
> > >
> > > [1] https://github.com/solana-labs/solana/issues/14586 (not
> > > Streams-related, but it is RocksDB)
> > >
> > > Thanks in advance,
> > > Colt McNealy
> > >
> > > *Founder, LittleHorse.dev*
> > >
> >


Re: Streams/RocksDB: Why Universal Compaction?

2023-07-23 Thread Guozhang Wang
Yeah I can shed some light here: I used Universal originally since at
the beginning of Kafka Streams journey there were user reports
complaining about its storage amplifications. But soon enough (around
2019) I've realized that, as a OOTB config, level compaction may be
more preferable.

I had a PR dating back to that time where I suggested changing a bunch
of OOTB configs or RocksDB including the compaction config:
https://github.com/apache/kafka/pull/6406/files, unfortunately it was
not merged since I wanted to run some benchmarks to make sure it does
not have any gotchas but never got the time to do so. I would be very
happy in fact if someone could pick that up and re-examine if they
still make sense, and if yes drive it through and merge.

Guozhang


On Sun, Jul 23, 2023 at 10:29 AM Matthias J. Sax  wrote:
>
> Do you happen to know?
>
>
>  Forwarded Message 
> Subject: Streams/RocksDB: Why Universal Compaction?
> Date: Fri, 23 Jun 2023 13:19:36 -0700
> From: Colt McNealy 
> Reply-To: users@kafka.apache.org
> To: users@kafka.apache.org
>
> Hello there!
>
> I was wondering if anyone (perhaps an early developer or power-user of
> Kafka Streams) knows why the Streams developers made the default setting
> for RocksDB compaction "Universal" compaction rather than "Level"
> compaction?
>
> My understanding (in which I am extremely UNconfident) is as follows—
>
> Supposedly Universal compaction leads to lower write amplification after
> compaction finishes. In a run of Universal compaction, all data is
> compacted; as per the RocksDB documentation it is possible for temporary
> write amplification of up to 2x during this process. There have also been
> reports of "write stalls" during this process [1].
>
> In Level compaction, only certain levels (tiers of SST files) are compacted
> at once, meaning that the compaction process is shorter and less intensive,
> but that write amplification after compaction finishes is higher than with
> universal compaction.
>
> Can anyone confirm/deny/correct this?
>
> [1] https://github.com/solana-labs/solana/issues/14586 (not
> Streams-related, but it is RocksDB)
>
> Thanks in advance,
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>


Re: is exactly-once supported with kafka streams application with external state store like redis

2023-04-03 Thread Guozhang Wang
Hello Pushkar,

Unfortunately it is not supported at the moment, see
https://issues.apache.org/jira/browse/KAFKA-12475 for some details.
Currently the community is working on KAFKA-12549 to support
transactional state stores, at that time customized remote stores
would be able to provide additional API implementations in order to
support EOS.

Guozhang

On Mon, Apr 3, 2023 at 5:06 AM Pushkar Deole  wrote:
>
> Hi All,
>
> We are using streams application with redis for state store.
> Redis was mainly considered instead of kafka state stores because of the
> reason that global state store once updated by one application instance was
> taking few milliseconds to reflect updated global state to another
> application instance.
> Now, we may need to enable exactly-once semantics, however wondering if it
> would work with redis state store, or rollbacks would still have stale
> state left in redis?


Re: [ANNOUNCE] New committer: Stanislav Kozlovski

2023-01-17 Thread Guozhang Wang
Congratulations Stan!

Guozhang

On Tue, Jan 17, 2023 at 3:24 PM Matthias J. Sax  wrote:

> Congrats!
>
> On 1/17/23 1:26 PM, Ron Dagostino wrote:
> > Congratulations, Stan!
> >
> > Ron
> >
> >> On Jan 17, 2023, at 12:29 PM, Mickael Maison 
> wrote:
> >>
> >> Congratulations Stanislav!
> >>
> >>> On Tue, Jan 17, 2023 at 6:06 PM Rajini Sivaram <
> rajinisiva...@gmail.com> wrote:
> >>>
> >>> Congratulations, Stan!
> >>>
> >>> Regards,
> >>>
> >>> Rajini
> >>>
>  On Tue, Jan 17, 2023 at 5:04 PM Tom Bentley 
> wrote:
> 
>  Congratulations!
> 
> > On Tue, 17 Jan 2023 at 16:52, Bill Bejeck  wrote:
> 
> > Congratulations Stan!
> >
> > -Bill
> >
> > On Tue, Jan 17, 2023 at 11:37 AM Bruno Cadonna 
>  wrote:
> >
> >> Congrats Stan!
> >>
> >> Well deserved!
> >>
> >> Best,
> >> Bruno
> >>
> >> On 17.01.23 16:50, Jun Rao wrote:
> >>> Hi, Everyone,
> >>>
> >>> The PMC of Apache Kafka is pleased to announce a new Kafka
> committer
> >>> Stanislav Kozlovski.
> >>>
> >>> Stan has been contributing to Apache Kafka since June 2018. He made
> >> various
> >>> contributions including the following KIPs.
> >>>
> >>> KIP-455: Create an Administrative API for Replica Reassignment
> >>> KIP-412: Extend Admin API to support dynamic application log levels
> >>>
> >>> Congratulations, Stan!
> >>>
> >>> Thanks,
> >>>
> >>> Jun (on behalf of the Apache Kafka PMC)
> >>>
> >>
> >
> 
>


[ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Guozhang Wang
Hi everyone,

I'd like to introduce our new Kafka PMC member, Bruno.

Bruno has been a committer since April. 2021 and has been very active in
the community. He's a key contributor to Kafka Streams, and also helped
review a lot of horizontal improvements such as Mockito. It is my pleasure
to announce that Bruno has agreed to join the Kafka PMC.

Congratulations, Bruno!

-- Guozhang Wang, on behalf of Apache Kafka PMC


Re: Add to Kafka contributor list

2022-09-05 Thread Guozhang Wang
Hello Vinay,

Thanks for your interest in contributing, I've added you to the list.

Guozhang

On Sun, Sep 4, 2022 at 10:57 PM vinay kumar 
wrote:

> Hi,
>
> Could you add me to Kafka contributor list please?
>
>
> I want to start contributing to Kafka
>
> Jira user : vvakati
>
> Thanks,
> Vinay
>


-- 
-- Guozhang


Re: Add to Contributors list of Kafka

2022-09-05 Thread Guozhang Wang
Hello Nandini,

Thanks for your interests, I've added you to the contributors list.

Guozhang

On Mon, Sep 5, 2022 at 8:16 AM Nandini Anagondi 
wrote:

> Hi,
>
> Can you add Nandini Anagondi to the contributor list.
>
> Thanks,
> Nandini A.
>


-- 
-- Guozhang


Re: leftjoin not working as expected.

2022-08-09 Thread Guozhang Wang
Hello Chad,

Here are a few thoughts on top of my head: for left joins, we would keep
those received records from the left side that have NOT found a match on
the right side in a separate temporary store (this is only recently
improved, but since you're already on 3.2.1 it's the case indeed). When
later e.g. a right hand side record arrives and found a match on the
temporary left hand side "no-matching-yet" store, we would delete from that
store and emit the join result. But if no matches found as the join window
elapsed, we would still emit those records from the "no-matching-yet" store
and emit the join result as (left, null).

In your case, I think the arrival of the second record advances the
inferred stream time, and hence after that time advanced the first record,
originally in the "no-matching-yet" store, are doomed to not found a match
as join window already expires, so we would emit that record, but as I
said, when that happens the join code should execute with the right side as
"null". So my question is: when you see that join func executed with the
left side as the first record, is the right side "null"? If yes I think
that's reflecting what I'm describing here.


Guozhang




On Thu, Aug 4, 2022 at 9:29 AM Chad Preisler 
wrote:

> Hello,
>
> I'm doing a stream to stream leftjoin. Here is what I am seeing when I test
> the code.
>
> - I write a record to the left side topic. The stream app reads  the
> message and the deserializer gets triggered. However, the join is not
> triggered at this time.
>
> - I write another record to the left side topic (different key) and I see
> the deserializer get called for the topic. I see the deserializer gets
> called a second time for a store-changelog topic and it deserializes the
> first record. The leftjoin code is executed for the first record submitted.
> This behavior isn't even consistent. Some records on the left never get
> processed.
>
> Why are all the records not processed right away or at all? My join window
> is just 500ms.
>
> I'm using the Kafka 3.2.1 client.
>
> Here is a code snippet of the leftjoin.
>
> KStream partyStream =
> streamsBuilder.stream(PARTY_TOPIC,
> Consumed.with(Serdes.String(), partySerde));
>
> KStream itemListStream =
> streamsBuilder.stream(TODO_ITEMS_LIST_TOPIC, Consumed.with(Serdes.String(),
> itemListSerde));
>
>
> KStream updatedPartyStream =
> partyStream.leftJoin(itemListStream, (party, itemList) -> {
> if (itemList != null) {
> party.setToDoItems(itemList.getToDoItems());
> }
> return party;
> }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(500)),
> StreamJoined.with(Serdes.String(), partySerde,
> itemListSerde));
>
> Thanks,
> Chad
>


-- 
-- Guozhang


[ANNOUNCE] New Kafka PMC Member: A. Sophie Blee-Goldman

2022-08-01 Thread Guozhang Wang
Hi everyone,

I'd like to introduce our new Kafka PMC member, Sophie. She has been a
committer since Oct. 2020 and has been contributing to the community
consistently, especially around Kafka Streams and Kafka java consumer. She
has also presented about Kafka Streams at Kafka Summit London this year. It
is my pleasure to announce that Sophie agreed to join the Kafka PMC.

Congratulations, Sophie!

-- Guozhang Wang, on behalf of Apache Kafka PMC


Re: Consume data-skewed partitions using Kafka-streams causes consumer load balancing issue.

2022-07-13 Thread Guozhang Wang
Hello Ankit,

Kafka Streams's rebalance protocol is trying to balance workloads based on
the num.partitions (more specifically, the num.tasks which is derived from
the input partitions) but not on the num.messages or num.bytes, so they
would not be able to handle data-skewness across partitions unfortunately.

In practice, if a KS app is reading multiple topics, the data skewness
could be remedied since an instance could get the heavy partitions of a
topic, while getting light partitions of another topic. But if your app is
only reading a single topic that has data skewness, it's hard to balance
the throughput.


Guozhang





On Thu, Jul 7, 2022 at 7:29 AM ankit Soni 
wrote:

> Hello kafka-users,
>
> I have 50 topics, each with 32 partitions where data is being ingested
> continuously.
>
> Data is being published in these 50 partitions externally (no control)
> which causes data skew amount the partitions of each topic.
>
> For example: For topic-1, partition-1 contains 100 events, while
> partition-2 can have 10K events and so on for all 50 topics.
>
> *Consuming data from all 50 topics using kafka-stream mechanism,*
>
>- Running 4 consumer instances, all within the same consumer-group.
>- Num of threads per consumer process: 8
>
>
> As data among partitions are not evenly distributed (Data-skewed partitions
> across topics), I see 1 or 2 consumer instances (JVM) are
> processing/consuming very less records compared to other 2 instances, My
> guess is these instances process partitions with less data.
>
> *Can someone help, how can I balance the consumers here (distribute
> consumer workload evenly across 4 consumer instances)? Expectation here is
> that all 4 consumer instances should process approx. same amount of
> events. *
>
> Looking forward to hearing your inputs.
>
> Thanks in advance.
>
> *Ankit.*
>


-- 
-- Guozhang


Re: unsubscribed from all topics when adding a KTable

2022-06-07 Thread Guozhang Wang
Hello Meir,

>From the code snippet I cannot find where did you add a KTable, it seems
you created a KStream from the source topic, and aggregate the stream into
a KTable, could you show me the code difference between "adding a KTable"
v.s. "adding a KStream"?

Anyways, the log line should only happen when `unsubscribe` is explicitly
called on the consumer which would happen only for two cases: 1) the
instance is shutting down (potentially due to an exception), 2) the
instance is handling a task-migrated exception. In either case you should
see other log lines on INFO/WARN indicating the cases. I suspect your code
has something that throws an exception right upon starting up that caused
it to shutdown (i.e. case 1) but that should be easily confirmed from the
other log lines.


Guozhang



On Fri, May 27, 2022 at 2:31 PM Meir Goldenberg 
wrote:

> Hi,
>
>
> I'm trying to write a very basic Kafka streams consumer in Java.
> Once I add a KTable, I see a message in the server log that I have been
> unsubscribed from all topics.
> Doing the same with a KStream instead of KTable works fine for me.
>
> I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on
> raspbian OS.
>
> I tried modifying the group.initial.rebalance.delay.ms in the server
> properties but this did not help.
>
> The message I get in the server log is:
>
> [2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with
> unknown member id joins group streams-wiki-created-table in Empty state.
> Created a new member id
> streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
> and request the member to rejoin with this id.
> (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to
> rebalance group streams-wiki-created-table in state PreparingRebalance with
> old generation 2 (__consumer_offsets-16) (reason: Adding new member
> streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
> with group instance id None; client reason: rebalance failed due to 'The
> group member needs to have a valid member id before actually entering a
> consumer group.' (MemberIdRequiredException))
> (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,089] INFO [GroupCoordinator 0]: Stabilized group
> streams-wiki-created-table generation 3 (__consumer_offsets-16) with 1
> members (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,458] INFO [GroupCoordinator 0]: Assignment received
> from leader
> streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
> for group streams-wiki-created-table for generation 3. The group has 1
> members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,955] INFO [GroupCoordinator 0]: Preparing to
> rebalance group streams-wiki-created-table in state PreparingRebalance with
> old generation 3 (__consumer_offsets-16) (reason: Removing member
> streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
> on LeaveGroup; client reason: the consumer unsubscribed from all topics)
> (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,960] INFO [GroupCoordinator 0]: Group
> streams-wiki-created-table with generation 4 is now empty
> (__consumer_offsets-16) (kafka.coordinator.group.GroupCoordinator)
> [2022-05-28 00:29:44,998] INFO [GroupCoordinator 0]: Member
> MemberMetadata(memberId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e,
> groupInstanceId=None,
> clientId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer,
> clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=30,
> supportedProtocols=List(stream)) has left group streams-wiki-created-table
> through explicit `LeaveGroup`; client reason: the consumer unsubscribed
> from all topics (kafka.coordinator.group.GroupCoordinator)
>
>
> My code is as following:
>
>
>
>
>
> properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "streams-wiki-created-table");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>
> TimeWindows monthWindow = TimeWindows.of(Duration.ofDays(30));
> TimeWindows weekWindow = TimeWindows.of(Duration.ofDays(7));
> TimeWindows dayWindow = TimeWindows.of(Duration.ofDays(1));
> TimeWindows hourWindow = TimeWindows.of(Duration.ofHours(1));
>
> StreamsBuilder builder = new StreamsBuilder();
>
> KTable createdPagesUserTypeTable =
> 

Re: What role plays transactional.id after KIP-447?

2022-06-02 Thread Guozhang Wang
I think "commitTransaction" should not throw CommitFailedException. Here
admittedly we are overusing the term "commit" here, as we use it for two
operations: committing the offsets (used for consumer, in either EOS or
ALOS), and committing the transaction. The exception is meant for the
former and would not be expected in `commitTransaction`.


Guozhang

On Thu, Jun 2, 2022 at 5:45 AM Gabriel Giussi 
wrote:

> "I think we may overlooked it in documentation to emphasize that, in case
> 1), it should not expect ProducerFencedException. If so, we can fix the
> javadoc."
>
> IMHO that would be nice, I'm reviewing an existing codebase where we were
> only handling ProducerFencedException, because the javadoc and the method
> signature is explicit only about that, and CommitFailedException is not
> even referenced but falls under the general KafkaException.
> I think this could happen in both sendOffsetsToTransaction and
> commitTransaction right?
>
> Thanks.
>
> El mar, 31 may 2022 a las 14:49, Guozhang Wang ()
> escribió:
>
> > The CommitFailedException should be expected, since the fencing happens
> at
> > the consumer coordinator. I.e. we can only fence the consumer-producer
> pair
> > by the consumer's generation, but we cannot do so since there's no other
> > producer who has just grabbed the same txn.id and bumped the producer
> > epoch.
> >
> > So to just clarify, when the zombie comes back, it could be fenced either
> > when:
> >
> > 1) it tries to complete the ongoing transaction via `sendOffset`, in
> which
> > it would see the CommitFailedException. The caller is then responsible to
> > handle the thrown exception that indicates being fenced.
> > 2) it tries to heartbeat in the background thread, and got an
> > InvalidGeneration error code, in which it would trigger the
> > onPartitionsLost. The callback impl class is then responsible to handle
> > that case which indicates being fenced.
> >
> > I think we may overlooked it in documentation to emphasize that, in case
> > 1), it should not expect ProducerFencedException. If so, we can fix the
> > javadoc.
> >
> >
> >
> >
> > On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi 
> > wrote:
> >
> > > But there is no guarantee that the onPartitionsLost callback will be
> > called
> > > before a zombie producer coming back to life tries to continue with the
> > > transaction, e.g. sending offsets or committing, so I should handle the
> > > exception first and I could directly create a new producer there
> instead
> > of
> > > doing in the callback.
> > > The curious part for me is that I was able to reproduce a case that
> > > simulates a zombie producer that will try to send offsets after a
> > rebalance
> > > but instead of failing with a ProducerFencedException is failing with a
> > > CommitFailedException with this message "Transaction offset Commit
> failed
> > > due to consumer group metadata mismatch: Specified group generation id
> is
> > > not valid.", which makes sense but is not even documented in the
> > > KafkaProducer#sendOffsetsToTransaction.
> > > Is this the expected behaviour or it should fail with a
> > > ProducerFencedException when the generation.id is outdated?
> > > The case I reproduced is like this
> > > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a
> > > producer with transactional.id = "tid.123"
> > > 2. Consumes message from partition 1 and sends it to another thread to
> be
> > > consumed (so the poll thread is not blocked)
> > > 3. Producer A begins a transaction, sends to output topic and gets
> > blocked
> > > (I'm using a lock here to simulate a long processing) before calling
> > > sendOffsetsToTransaction
> > > 4. Consumer B is created and gets assigned partition 1 (I'm using
> > > CooperativeStickyAssignor) and creates a producer with
> transactional.id
> > =
> > > "tid.456"
> > > 5. Consumer B fetches the same message, processes it and commits the
> > > transaction successfully
> > > 6. Producer A calls sendOffsetsToTransaction (because the lock was
> > > released) and fails with CommitFailedException
> > >
> > > This behaviour reflects what is described here
> > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > > ,
> > > but I was actually expecting a ProducerFencedException ins

Re: What role plays transactional.id after KIP-447?

2022-05-31 Thread Guozhang Wang
The CommitFailedException should be expected, since the fencing happens at
the consumer coordinator. I.e. we can only fence the consumer-producer pair
by the consumer's generation, but we cannot do so since there's no other
producer who has just grabbed the same txn.id and bumped the producer epoch.

So to just clarify, when the zombie comes back, it could be fenced either
when:

1) it tries to complete the ongoing transaction via `sendOffset`, in which
it would see the CommitFailedException. The caller is then responsible to
handle the thrown exception that indicates being fenced.
2) it tries to heartbeat in the background thread, and got an
InvalidGeneration error code, in which it would trigger the
onPartitionsLost. The callback impl class is then responsible to handle
that case which indicates being fenced.

I think we may overlooked it in documentation to emphasize that, in case
1), it should not expect ProducerFencedException. If so, we can fix the
javadoc.




On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi 
wrote:

> But there is no guarantee that the onPartitionsLost callback will be called
> before a zombie producer coming back to life tries to continue with the
> transaction, e.g. sending offsets or committing, so I should handle the
> exception first and I could directly create a new producer there instead of
> doing in the callback.
> The curious part for me is that I was able to reproduce a case that
> simulates a zombie producer that will try to send offsets after a rebalance
> but instead of failing with a ProducerFencedException is failing with a
> CommitFailedException with this message "Transaction offset Commit failed
> due to consumer group metadata mismatch: Specified group generation id is
> not valid.", which makes sense but is not even documented in the
> KafkaProducer#sendOffsetsToTransaction.
> Is this the expected behaviour or it should fail with a
> ProducerFencedException when the generation.id is outdated?
> The case I reproduced is like this
> 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a
> producer with transactional.id = "tid.123"
> 2. Consumes message from partition 1 and sends it to another thread to be
> consumed (so the poll thread is not blocked)
> 3. Producer A begins a transaction, sends to output topic and gets blocked
> (I'm using a lock here to simulate a long processing) before calling
> sendOffsetsToTransaction
> 4. Consumer B is created and gets assigned partition 1 (I'm using
> CooperativeStickyAssignor) and creates a producer with transactional.id =
> "tid.456"
> 5. Consumer B fetches the same message, processes it and commits the
> transaction successfully
> 6. Producer A calls sendOffsetsToTransaction (because the lock was
> released) and fails with CommitFailedException
>
> This behaviour reflects what is described here
>
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> ,
> but I was actually expecting a ProducerFencedException instead. Does that
> exception only correspond to fencing done by transactional.id?
>
> Thanks
>
> El mar, 24 may 2022 a las 20:30, Guozhang Wang ()
> escribió:
>
> > No problem.
> >
> > The key is that at step 4, when the consumer re-joins it will be aware
> that
> > it has lost its previously assigned partitions and will trigger
> > `onPartitionsLost` on the rebalance callback. And since in your scenario
> > it's a 1-1 mapping from consumer to producer, it means the producer has
> > been fenced and hence should be closed.
> >
> > So in that step 4, the old producer with Client A should be closed within
> > the rebalance callback, and then one can create a new producer to pair
> with
> > the re-joined consumer.
> >
> > On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi 
> > wrote:
> >
> > > Last question, the fencing occurs with the sendOffsetsToTransaction
> which
> > > includes ConsumerGroupMetadata, I guess the generation.id is what
> > matters
> > > here since it is bumped with each rebalance.
> > > But couldn't this happen?
> > > 1. Client A consumes from topic partition P1 with generation.id = 1
> and
> > a
> > > producer associated to it produces to some output topic but a long GC
> > pause
> > > occurs before calling sendOffsetsToTransaction
> > > 2. Client A gets out of sync and becomes a zombie due to session
> timeout,
> > > group rebalanced.
> > > 3. Client B is assigned topic partition P1 with generation.id = 2,
> calls
> > > sendOffsetsToTransaction and commits the txn
> > > 4. Client A is back online and joins again with generation.id = 3
> (

Re: What role plays transactional.id after KIP-447?

2022-05-24 Thread Guozhang Wang
No problem.

The key is that at step 4, when the consumer re-joins it will be aware that
it has lost its previously assigned partitions and will trigger
`onPartitionsLost` on the rebalance callback. And since in your scenario
it's a 1-1 mapping from consumer to producer, it means the producer has
been fenced and hence should be closed.

So in that step 4, the old producer with Client A should be closed within
the rebalance callback, and then one can create a new producer to pair with
the re-joined consumer.

On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi 
wrote:

> Last question, the fencing occurs with the sendOffsetsToTransaction which
> includes ConsumerGroupMetadata, I guess the generation.id is what matters
> here since it is bumped with each rebalance.
> But couldn't this happen?
> 1. Client A consumes from topic partition P1 with generation.id = 1 and a
> producer associated to it produces to some output topic but a long GC pause
> occurs before calling sendOffsetsToTransaction
> 2. Client A gets out of sync and becomes a zombie due to session timeout,
> group rebalanced.
> 3. Client B is assigned topic partition P1 with generation.id = 2, calls
> sendOffsetsToTransaction and commits the txn
> 4. Client A is back online and joins again with generation.id = 3 (this
> happens in some internal thread)
> 5. The thread that was about to call sendOffsetsToTransaction is scheduled
> and calls sendOffsetsToTransaction with generation.id = 3 which is the
> current one so it won't be fenced.
>
> I'm asking this because we are always asking the current
> consumerGroupMetadata to the consumer object, not the one that was used to
> consume the offsets, like this
> producer.sendOffsetsToTransaction(consumedOffsets,
> consumer.groupMetadata());
>
> Couldn't this return a groupMetadata that has a valid generation.id even
> when it is not the same at the moment of consuming the messages that are
> about to be commited?
>
> I'm sure I'm missing something (probably in step 4) that makes this not a
> possible scenario, but I can't say what it is.
>
> Sorry if the question is too confusing.
>
>
>
>
>
>
> El mar, 24 may 2022 a las 12:49, Guozhang Wang ()
> escribió:
>
> > Hi Gabriel,
> >
> > What I meant is that with KIP-447, the fencing is achieved by the time of
> > committing with the consumer metadata. If within a transaction, the
> > producer would always try to commit at least once on behalf of the
> > consumer, AND a zombie of the producer would always come from a zombie
> of a
> > consumer, then the transaction would be guaranteed to be fenced. But:
> >
> > 1) If within a transaction, there's no `sendOffset..` triggered, then
> > fencing still need to be done by the txn coordinator, and txn.id plays
> the
> > role here  I think this is not your scenario.
> > 2) If a consumer may be "represented" by multiple producers, and a zombie
> > producer does not come from a zombie consumer, then we still need the
> > fencing be done via the txn.id --- this is the scenario I'd like to
> remind
> > you about. For example, if two producers could be (mistakenly) created
> with
> > different txn.ids and are paired with the same consumer, then the new API
> > in KIP-447 would not fence one of them.
> >
> > Guozhang
> >
> > On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi 
> > wrote:
> >
> > > Hello Guozhang,
> > >
> > > thanks for the response, I have some doubts about the "N-1
> > > producer-consumer" case you mentioned and why I may need to configure
> the
> > > transactional id there and how. Is this a case of N consumers sharing
> the
> > > same producer right?
> > >
> > > My current implementation is creating a consumer per topic (I don't
> > > subscribe to multiple topics from the same consumer) and starting a
> > > producer per consumer, so the relation is 1 consumer/topic => 1
> producer
> > > and the transactional id is set as
> > --.
> > > Do you see any problem with this configuration?
> > >
> > > Thanks again.
> > >
> > > El sáb, 21 may 2022 a las 16:37, Guozhang Wang ()
> > > escribió:
> > >
> > > > Hello Gabriel,
> > > >
> > > > What you're asking is a very fair question :) In fact, for Streams
> > where
> > > > the partition-assignment to producer-consumer pairs are purely
> > flexible,
> > > we
> > > > think the new EOS would not have hard requirement on
> transactional.id:
> > > > https://issues.apache.org/jira/browse/KAFKA-9453
> > > >
>

Re: What role plays transactional.id after KIP-447?

2022-05-24 Thread Guozhang Wang
Hi Gabriel,

What I meant is that with KIP-447, the fencing is achieved by the time of
committing with the consumer metadata. If within a transaction, the
producer would always try to commit at least once on behalf of the
consumer, AND a zombie of the producer would always come from a zombie of a
consumer, then the transaction would be guaranteed to be fenced. But:

1) If within a transaction, there's no `sendOffset..` triggered, then
fencing still need to be done by the txn coordinator, and txn.id plays the
role here  I think this is not your scenario.
2) If a consumer may be "represented" by multiple producers, and a zombie
producer does not come from a zombie consumer, then we still need the
fencing be done via the txn.id --- this is the scenario I'd like to remind
you about. For example, if two producers could be (mistakenly) created with
different txn.ids and are paired with the same consumer, then the new API
in KIP-447 would not fence one of them.

Guozhang

On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi 
wrote:

> Hello Guozhang,
>
> thanks for the response, I have some doubts about the "N-1
> producer-consumer" case you mentioned and why I may need to configure the
> transactional id there and how. Is this a case of N consumers sharing the
> same producer right?
>
> My current implementation is creating a consumer per topic (I don't
> subscribe to multiple topics from the same consumer) and starting a
> producer per consumer, so the relation is 1 consumer/topic => 1 producer
> and the transactional id is set as  --.
> Do you see any problem with this configuration?
>
> Thanks again.
>
> El sáb, 21 may 2022 a las 16:37, Guozhang Wang ()
> escribió:
>
> > Hello Gabriel,
> >
> > What you're asking is a very fair question :) In fact, for Streams where
> > the partition-assignment to producer-consumer pairs are purely flexible,
> we
> > think the new EOS would not have hard requirement on transactional.id:
> > https://issues.apache.org/jira/browse/KAFKA-9453
> >
> > I you implemented the transactional messaging via a DIY producer+consumer
> > though, it depends on how you'd expect the life-time of a producer, e.g.
> if
> > you do not have a 1-1 producer-consumer mapping then transactional.id is
> > not crucial, but if your have a N-1 producer-consumer mapping then you
> may
> > still need to configure that id.
> >
> >
> > Guozhang
> >
> >
> >
> > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi 
> > wrote:
> >
> > > Before KIP-447 I understood the use of transactional.id to prevent us
> > from
> > > zombies introducing duplicates, as explained in this talk
> > > https://youtu.be/j0l_zUhQaTc?t=822.
> > > So in order to get zombie fencing working correctly we should assign
> > > producers with a transactional.id that included the partition id,
> > > something
> > > like -, as shown in this slide
> > > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the
> > same
> > > txnl.id A as the process 1 that crashed.
> > > This prevented us from having process 2 consuming the message again and
> > > committing, while process 1 could come back to life and also commit the
> > > pending transaction, hence having duplicates message being produced. In
> > > this case process 1 will be fenced by having an outdated epoch.
> > >
> > > With KIP-447 we no longer have that potential scenario of two pending
> > > transactions trying to produce and mark a message as committed, because
> > we
> > > won't let process 2 even start the transaction if there is a pending
> one
> > > (basically by not returning any messages since we reject the Offset
> Fetch
> > > if a there is a pending transaction for that offset partition). This is
> > > explained in this post
> > >
> > >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > >
> > > Having that, I don't see anymore the value of transactional.id or how
> I
> > > should configure it in my producers. The main benefit of KIP-447 is
> that
> > we
> > > no longer have to start one producer per input partition, a quote from
> > the
> > > post
> > > "The only way the static assignment requirement could be met is if each
> > > input partition uses a separate producer instance, which is in fact
> what
> > > Kafka Streams previously relied on. However, this made running EOS
> > > applications much more costly in terms of the client resources and load
> > on
> > > the brokers. A large number of client connections could heavily impact
> > the
> > > stability of brokers and become a waste of resources as well."
> > >
> > > I guess now I can reuse my producer between different input partitions,
> > so
> > > what transactional.id should I assign to it and why should I care,
> isn't
> > > zombie fencing resolved by rejecting offset fetch already?
> > >
> > > Thanks.
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: Request to include me to contributors list

2022-05-23 Thread Guozhang Wang
Thanks for your interest! Please let me know what's your id?

Guozhang

On Sun, May 22, 2022 at 6:05 AM Kumud Kumar Srivatsava Tirupati <
kumudkumartirup...@gmail.com> wrote:

> Hi team,
> I am willing to work on
> https://issues.apache.org/jira/browse/KAFKA-13926 please
> add me to the contributors list so that I can assign the ticket to myself.
>
> *---*
> *Thanks and Regards,*
> *Kumud Kumar Srivatsava Tirupati*
> *Ph : +91-8686073938*
>


-- 
-- Guozhang


Re: What role plays transactional.id after KIP-447?

2022-05-21 Thread Guozhang Wang
Hello Gabriel,

What you're asking is a very fair question :) In fact, for Streams where
the partition-assignment to producer-consumer pairs are purely flexible, we
think the new EOS would not have hard requirement on transactional.id:
https://issues.apache.org/jira/browse/KAFKA-9453

I you implemented the transactional messaging via a DIY producer+consumer
though, it depends on how you'd expect the life-time of a producer, e.g. if
you do not have a 1-1 producer-consumer mapping then transactional.id is
not crucial, but if your have a N-1 producer-consumer mapping then you may
still need to configure that id.


Guozhang



On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi 
wrote:

> Before KIP-447 I understood the use of transactional.id to prevent us from
> zombies introducing duplicates, as explained in this talk
> https://youtu.be/j0l_zUhQaTc?t=822.
> So in order to get zombie fencing working correctly we should assign
> producers with a transactional.id that included the partition id,
> something
> like -, as shown in this slide
> https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the same
> txnl.id A as the process 1 that crashed.
> This prevented us from having process 2 consuming the message again and
> committing, while process 1 could come back to life and also commit the
> pending transaction, hence having duplicates message being produced. In
> this case process 1 will be fenced by having an outdated epoch.
>
> With KIP-447 we no longer have that potential scenario of two pending
> transactions trying to produce and mark a message as committed, because we
> won't let process 2 even start the transaction if there is a pending one
> (basically by not returning any messages since we reject the Offset Fetch
> if a there is a pending transaction for that offset partition). This is
> explained in this post
>
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
>
> Having that, I don't see anymore the value of transactional.id or how I
> should configure it in my producers. The main benefit of KIP-447 is that we
> no longer have to start one producer per input partition, a quote from the
> post
> "The only way the static assignment requirement could be met is if each
> input partition uses a separate producer instance, which is in fact what
> Kafka Streams previously relied on. However, this made running EOS
> applications much more costly in terms of the client resources and load on
> the brokers. A large number of client connections could heavily impact the
> stability of brokers and become a waste of resources as well."
>
> I guess now I can reuse my producer between different input partitions, so
> what transactional.id should I assign to it and why should I care, isn't
> zombie fencing resolved by rejecting offset fetch already?
>
> Thanks.
>


-- 
-- Guozhang


Re: docs fixes

2022-05-18 Thread Guozhang Wang
Thanks for reporting this.

Are you interested in submitting a PR to fix it?

Guozhang

On Wed, May 18, 2022 at 7:26 AM Владимир Савостин 
wrote:

> In section https://kafka.apache.org/quickstart#quickstart_kafkaconnect
> you should change plugin path from lib/connect-file-3.2.0.ja to
> libs/connect-file-3.2.0.ja
> Have a nice day!
>


-- 
-- Guozhang


Re: Kafka Stretch Clusters

2022-05-12 Thread Guozhang Wang
>  Am I correct in assuming
that if the preferred leader is not available, the next replica in the ISR
list is chosen to be the leader?

Yes, that's correct :)

On Wed, May 11, 2022 at 1:15 PM Andrew Otto  wrote:

> Thanks so much Guozhang!
>
> > 1) For the producer -> leader hop, could you save the cross-DC network?
> >  even if your message's partition has to be determined deterministically
> by the key, in operations you can still see if most of your active
> producers
> are from one DC, then configure your topic partitions to be hosted by
> brokers within the same DC. Generally speaking, there are various ways you
> can consider saving this hop from across DCs.
>
> Hm, perhaps something like this?
> If we run the producer in active/standby mode, so that the producer
> application only ever runs in one DC at a time, could we manage the
> preferred leaders via the replica list order during a failover?  Example:
> If DC-A is the 'active' DC, then the producer would run only in DC-A.  We'd
> ensure that each partition's replica list starts with brokers only in DC-A.
>
>
> Let Broker A1 and A2 be in DC-A, and Broker B1 and B2 in DC-B.  partition 0
> and partition 1 have a replication factor of 4.
>
> p0: [A1, A2, B1,B2]
> p1: [A2, A1, B2, B1]
>
> In order to failover to DC-B, we'd reassign the partition replica list to
> put the DC-B brokers first, like:
> p0: [B1, B2, A1,A2]
> p1: [B2, B1, A2, A1]
>
> Then issue a preferred leader election, stop the producer in DC-A, and
> start it in DC-B.
> We'd incur a producer latency hit during the failover process until both
> partition leaders and the producer are in DC-B, but hopefully that will be
> short lived (minutes)?
>
> With follower fetching, this would still allow consumers in either DC to
> read from the closest replica, so it would allow for active/active reads.
> With at least 2 replicas in each DC, rolling broker restarts would
> hopefully still allow consumers to consume from replicas in their local DC.
>
> ---
> Also, a quick question about leader election.  Am I correct in assuming
> that if the preferred leader is not available, the next replica in the ISR
> list is chosen to be the leader?  Or, is it a random selection from any of
> the ISRs? If it is a random selection, then manually optimizing the replica
> list to reduce producer hops probably isn't worth trying, as we'd get the
> producer hops during normal broker maintenance.
>
> Thank you!
>
>
>
>
>
>
>
> On Mon, May 9, 2022 at 6:00 PM Guozhang Wang  wrote:
>
> > Hello Andrew.
> >
> > Just to answer your questions first, yes that's correct in your described
> > settings that three round-trips between DCs would incur, but since the
> > replica fetches can be done in parallel, the latency is not a sum of all
> > the round-trips. But if you stay with 2 DCs only, the number of
> round-trips
> > would only depend on the number of follower replicas that are on
> > different DCs with the leader replica.
> >
> > Jumping out of the question and your described settings, there are a
> couple
> > of things you can consider for your design:
> >
> > 1) For the producer -> leader hop, could you save the cross-DC network?
> For
> > example, if your message can potentially go to any partitions (such as it
> > is not key-ed), then you can customize your partitioner as a "rack-aware"
> > one that would always try to pick the partition whose leader co-exist
> > within the same DC as the producer client; even if your message's
> partition
> > has to be determined deterministically by the key, in operations you can
> > still see if most of your active producers are from one DC, then
> configure
> > your topic partitions to be hosted by brokers within the same DC.
> Generally
> > speaking, there are various ways you can consider saving this hop from
> > across DCs.
> >
> > 2) For the leader -> follower hop, you can start from first validating
> how
> > many failures cross DCs that you'd like to tolerate. For example, let's
> say
> > you have 2N+1 replicas per partition, with N+1 replicas including the
> > leader on one DC and N other replicas on the other DC, if we can set the
> > acks to N+2 then it means we will have the data replicated at least on
> one
> > remote replica before returning the request, and hence the data would not
> > be lost if the one whole DC fails, which could be sufficient from many
> > stretching and multi-colo cases. Then in practice, since the cross-colo
> > usually takes more latency, you'd usually get much fewer round-trips
> than N
> > across DC before satisfyin

Re: Kafka Stretch Clusters

2022-05-09 Thread Guozhang Wang
Hello Andrew.

Just to answer your questions first, yes that's correct in your described
settings that three round-trips between DCs would incur, but since the
replica fetches can be done in parallel, the latency is not a sum of all
the round-trips. But if you stay with 2 DCs only, the number of round-trips
would only depend on the number of follower replicas that are on
different DCs with the leader replica.

Jumping out of the question and your described settings, there are a couple
of things you can consider for your design:

1) For the producer -> leader hop, could you save the cross-DC network? For
example, if your message can potentially go to any partitions (such as it
is not key-ed), then you can customize your partitioner as a "rack-aware"
one that would always try to pick the partition whose leader co-exist
within the same DC as the producer client; even if your message's partition
has to be determined deterministically by the key, in operations you can
still see if most of your active producers are from one DC, then configure
your topic partitions to be hosted by brokers within the same DC. Generally
speaking, there are various ways you can consider saving this hop from
across DCs.

2) For the leader -> follower hop, you can start from first validating how
many failures cross DCs that you'd like to tolerate. For example, let's say
you have 2N+1 replicas per partition, with N+1 replicas including the
leader on one DC and N other replicas on the other DC, if we can set the
acks to N+2 then it means we will have the data replicated at least on one
remote replica before returning the request, and hence the data would not
be lost if the one whole DC fails, which could be sufficient from many
stretching and multi-colo cases. Then in practice, since the cross-colo
usually takes more latency, you'd usually get much fewer round-trips than N
across DC before satisfying the acks. And your average/p99 latencies would
not increase much compared with just one cross-DC replica.


Guozhang


On Mon, May 9, 2022 at 11:58 AM Andrew Otto  wrote:

> Hi all,
>
> I'm evaluating  the feasibility
> of setting up a cross datacenter Kafka 'stretch' cluster at The Wikimedia
> Foundation.
>
> I've found docs here and there, but they are pretty slim.  My
> biggest concern is the fact that while Follower Fetching
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> >
> helps
> with potential consumer latency in a stretch cluster, there is nothing that
> addresses producer latency.  I'd have expected the docs I've read to
> mention this if it was a concern, but I haven't seen it.
>
> Specifically, let's say I'm a producer in DC-A, and I want to produce to
> partition X with acks=all.  Partition X has 3 replicas, on brokers B1 in DC
> A, B2 in DC-A and B3 in DC-B.  Currently, the replica on B3(DC-B) is the
> partition leader.  IIUC, when I produce my message to partition X, that
> message will cross the DC boundary for my produce request to B3(DC-B), then
> back again when replica B1(DC-A) fetches, and also when replica B2(DC-A)
> fetches, for a total of 3 times between DCs.
>
> Questions:
> - Am I correct in understanding that each one of these fetches contributes
> to the ack latency?
>
> - And, as the number of brokers and replica increases, the number of times
> a message crosses the DC (likely) increases too?
>
> - When replicas are promoted to be a partition leader,  producer clients
> will shuffle their connections around, often resulting in them connecting
> to the leader in a remote datacenter. Should I be worried about this
> unpredictability in cross DC network connections and traffic?
>
> I'm really hoping that a stretch cluster will help solve some Multi DC
> streaming app architecture woes, but I'm not so sure the potential issues
> with partition leaders is worth it!
>
> Thanks for any insight y'all have,
> -Andrew Otto
>  Wikimedia Foundation
>


-- 
-- Guozhang


Re: Add to Kafka contributor list

2022-05-05 Thread Guozhang Wang
Thanks Chern for your interest to contribute! I've added you to the
contributors list.


Guozhang

On Thu, May 5, 2022 at 3:04 PM Chern Yih Cheah  wrote:

> Hi,
>
> Could you add me to Kakfa contributor list? I would like to work on
> https://issues.apache.org/jira/browse/KAFKA-13879
>
> Thank you,
> Chern Yih
>


-- 
-- Guozhang


Re: [VOTE] 3.1.1 RC0

2022-04-12 Thread Guozhang Wang
Thanks Tom.

I took a quick pass on the release notes, javadocs, downloaded the rc (zip
only) and compiled with no issues. +1


Guozhang

On Fri, Apr 8, 2022 at 9:18 AM Tom Bentley  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for release of Apache Kafka 3.1.1.
>
> Apache Kafka 3.1.1 is a bugfix release and 29 issues have been fixed
> since 3.1.0.
>
> Release notes for the 3.1.1 release:
> https://home.apache.org/~tombentley/kafka-3.1.1-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Friday 15 April, 12:00 UTC
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~tombentley/kafka-3.1.1-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~tombentley/kafka-3.1.1-rc0/javadoc/
>
> * Tag to be voted upon (off 3.1 branch) is the 3.1.1 tag:
> https://github.com/apache/kafka/releases/tag/3.1.1-rc0
>
> * Documentation:
> https://kafka.apache.org/31/documentation.html
>
> * Protocol:
> https://kafka.apache.org/31/protocol.html
>
> * Successful Jenkins builds for the 3.1 branch:
> I will share a link one the build is complete.
>
> /**
>
> Thanks,
>
> Tom
>


-- 
-- Guozhang


Re: Transactions and `endOffsets` Java client consumer method

2022-03-22 Thread Guozhang Wang
Hi Chris,

Since you are using read_committed mode, the txn marker from the
`endOffsets()` should indeed be skipped, no matter for committed of aborted
txns. For example if the log looks like this:

offsets:   0, 1, 2, 3, t_c(4) // or t_a(4) which means "abort marker"

then the endOffset should return "5".


That's why I'm unclear why you're seeing this issue.


Guozhang

On Tue, Mar 22, 2022 at 6:19 AM Chris Jansen 
wrote:

> Thanks Luke,
>
> If the transaction marker should be hidden, does it follow that aborted
> transaction at the end of the log should also be hidden for clients that
> are in read committed mode?
>
> Happy to do a KIP/PR.
>
> Thanks again,
>
> Chris
>
> On Tue, Mar 22, 2022 at 10:21 AM Luke Chen  wrote:
>
> > Hi Chris,
> >
> > Yes, the transaction marker should be hidden to clients.
> > There is similar issues reported:
> > https://issues.apache.org/jira/browse/KAFKA-10683
> > Welcome to submit KIP/PR to improve it.
> >
> > Thank you.
> > Luke
> >
> >
> > On Tue, Mar 22, 2022 at 5:16 PM Chris Jansen  >
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Sorry, I should have been more clear. By "an unreadable end of the
> log",
> > I
> > > mean the `endOffsets` method returns an offset for a record that is
> never
> > > surfaced to the caller of `poll`.
> > >
> > > I've done some more digging and I think I understand why that is now.
> The
> > > API `endOffsets` calls tells the client, at a low level, what offset it
> > can
> > > read up to in the log. This may include a transaction marker or an
> > aborted
> > > transaction that gets delivered, but the client skips over when
> returning
> > > records via the `poll` method. I understand why the client must be told
> > > where to read up to, as the decision to skip chunks of the log must be
> > made
> > > by the client.
> > >
> > > What I was looking for when calling `endOffsets` was the last offset
> that
> > > would be surfaced via the `poll` method rather than where the low level
> > > client should read up to. If this is at all possible, it seems like
> this
> > > might require a broker change.
> > >
> > > Thanks
> > >
> > > Chris
> > >
> > > On Mon, Mar 21, 2022 at 5:38 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > What do you mean by "an unreadable end of the log"? Could you
> > elaborate?
> > > >
> > > > The version you used is recent enough, and the configs seems okay.
> So I
> > > > think there are some issues elsewhere.
> > > >
> > > > On Mon, Mar 21, 2022 at 5:51 AM Chris Jansen <
> > chris.jan...@permutive.com
> > > >
> > > > wrote:
> > > >
> > > > > So an update on this, it seems that the consumer reports an
> > unreadable
> > > > end
> > > > > of the log if the transaction has been aborted. Is this the
> intended
> > > > > behaviour?
> > > > >
> > > > > Thanks again,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen <
> > > > chris.jan...@permutive.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Thanks for getting back to me. I'm using Confluent's distribution
> > > > version
> > > > > > 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker
> > > > > configuration
> > > > > > I'm missing that would change this behaviour?
> > > > > >
> > > > > > Just to confirm, here are my consumer settings:
> > > > > >
> > > > > > auto.offset.reset -> earliest, isolation.level -> read_committed,
> > > > > > group.instance.id -> , group.id ->
> > > > > > ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers
> > ->
> > > > > > PLAINTEXT://localhost:58300, enable.auto.commit -> false
> > > > > >
> > > > > > Thanks again,
> > > > > >
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > >
> > > > > > On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang <

Re: Transactions and `endOffsets` Java client consumer method

2022-03-21 Thread Guozhang Wang
Hi Chris,

What do you mean by "an unreadable end of the log"? Could you elaborate?

The version you used is recent enough, and the configs seems okay. So I
think there are some issues elsewhere.

On Mon, Mar 21, 2022 at 5:51 AM Chris Jansen 
wrote:

> So an update on this, it seems that the consumer reports an unreadable end
> of the log if the transaction has been aborted. Is this the intended
> behaviour?
>
> Thanks again,
>
> Chris
>
> On Mon, Mar 21, 2022 at 12:25 PM Chris Jansen 
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for getting back to me. I'm using Confluent's distribution version
> > 6.2.2, so Kafka 2.8.1. Could there be some consumer or broker
> configuration
> > I'm missing that would change this behaviour?
> >
> > Just to confirm, here are my consumer settings:
> >
> > auto.offset.reset -> earliest, isolation.level -> read_committed,
> > group.instance.id -> , group.id ->
> > ephemeral-6c5f8338-3d94-4296-b64a-d3a5a331117e, bootstrap.servers ->
> > PLAINTEXT://localhost:58300, enable.auto.commit -> false
> >
> > Thanks again,
> >
> >
> > Chris
> >
> >
> > On Sun, Mar 20, 2022 at 12:46 AM Guozhang Wang 
> wrote:
> >
> >> Hi Chris,
> >>
> >> The broker does take the isolation level in the ListOffset API (which is
> >> used for the endoffset call) in to consideration:
> >>
> >> val lastFetchableOffset = isolationLevel match {
> >>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
> >>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
> >>   case None => localLog.logEndOffset
> >> }
> >>
> >>
> >> If it is READ_COMMITTED, it would only return the last stable offset.
> >>
> >> But this requires the broker version to be newer enough to actually
> >> know the new format of the request, is your broker on the newer
> >> version? Other than that, I cannot think of a reason explaining what
> >> you saw.
> >>
> >>
> >>
> >> On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen <
> chris.jan...@permutive.com>
> >> wrote:
> >>
> >> > Hello all,
> >> >
> >> > I have the need to query end offsets for a particular topic using a
> >> > consumer that has been configured with the "READ_COMMITTED" isolation
> >> > level. The response I get via the Java client
> >> > includes offsets at the end of the log that have not yet been
> committed
> >> and
> >> > therefore can't be consumed.
> >> >
> >> > After digging around in the Java client code, it looks like the
> >> isolation
> >> > level is being transmitted in the API request to the broker. So my
> >> question
> >> > is does the broker use this information when crafting a response and,
> >> if it
> >> > is taken into account, why does it return log end offsets for
> >> transactions
> >> > that have not yet been committed?
> >> >
> >> > For my own future reference I was struggling to find any details on
> the
> >> > broker API anywhere, if someone could point me in the right direction,
> >> I'd
> >> > really appreciate it.
> >> >
> >> > Thanks,
> >> >
> >> >
> >> > Chris Jansen
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>


-- 
-- Guozhang


Re: Transactions and `endOffsets` Java client consumer method

2022-03-19 Thread Guozhang Wang
Hi Chris,

The broker does take the isolation level in the ListOffset API (which is
used for the endoffset call) in to consideration:

val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
}


If it is READ_COMMITTED, it would only return the last stable offset.

But this requires the broker version to be newer enough to actually
know the new format of the request, is your broker on the newer
version? Other than that, I cannot think of a reason explaining what
you saw.



On Wed, Mar 16, 2022 at 5:12 AM Chris Jansen 
wrote:

> Hello all,
>
> I have the need to query end offsets for a particular topic using a
> consumer that has been configured with the "READ_COMMITTED" isolation
> level. The response I get via the Java client
> includes offsets at the end of the log that have not yet been committed and
> therefore can't be consumed.
>
> After digging around in the Java client code, it looks like the isolation
> level is being transmitted in the API request to the broker. So my question
> is does the broker use this information when crafting a response and, if it
> is taken into account, why does it return log end offsets for transactions
> that have not yet been committed?
>
> For my own future reference I was struggling to find any details on the
> broker API anywhere, if someone could point me in the right direction, I'd
> really appreciate it.
>
> Thanks,
>
>
> Chris Jansen
>


-- 
-- Guozhang


Re: Kafka streams and user authentication

2022-02-25 Thread Guozhang Wang
Got it.

I'm not totally sure how the spring wires the properties from the file to
the actual configs in Streams, but just a general suggestion: in Kafka
Streams you can specify the config overrides for the internally embedded
producer and consumer respectively:
https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#kafka-consumers-and-producer-configuration-parameters

So if you can figure out how to wire the configs with the embedded producer
and consumer, then they can indeed use different accounts for reading and
writing.

Guozhang

On Thu, Feb 24, 2022 at 4:22 AM Alessandro Ernesto Mascherpa <
alessandro.masche...@piksel.com> wrote:

> Hello Guozhang,
>
> For authentication I'm using the following configuration:
>
> spring.kafka.properties.sasl.jaas.config =
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="" password=" ";
> spring.kafka.properties.ssl.truststore.location=/path/to/truststore.jks
> spring.kafka.properties.ssl.truststore.password=trustore-secret
> spring.kafka.properties.ssl.truststore.type=JKS
> spring.kafka.properties.security.protocol=SASL_SSL
> spring.kafka.properties.sasl.mechanism=PLAIN
>
> defined in a .properties file. With 'acocunt' I mean the pair
> username/password and the read/write rights the username has.
>
> If a simplified code snippet may be useful, the stream is created as:
>
> @Bean public KStream  plain(StreamsBuilder builder)  {
> KStream stream = builder.stream( "A" );
> stream.map( ... ).to( "B" );
> return stream;
> }
>
> Thanks
> Alessandro
>
>
> -Original Message-
> From: Guozhang Wang 
> Sent: mercoledì 23 febbraio 2022 19:20
> To: Users 
> Subject: Re: Kafka streams and user authentication
>
> Hello Alessandro,
>
> Could you elaborate a bit more on what authN methanisms you are using, and
> by `account` what do you mean explicitly?
>
>
> Guozhang
>
> On Wed, Feb 23, 2022 at 5:10 AM Alessandro Ernesto Mascherpa <
> alessandro.masche...@piksel.com> wrote:
>
> > Hi All,
> > I'm facing a problem with user authentication in Kafka streams in
> > Kafka v.3.0.0.
> >
> > A Java application reads from topic A as a stream and, in the same
> > stream, writes to topic B.
> > The two topics are configured with two different accounts, hence is it
> > feasible for a stream to read using an account and to write using
> > another account? And if so, how should I configure the stream?
> >
> > I'm available for further information and discussion Thanks in advance
> > Alessandro
> >
> > This message is private and confidential. If you have received this
> > message in error, please notify the sender or serviced...@piksel.com
> > and remove it from your system.
> >
> > Piksel Inc is a Delaware corporation, whose registered office is 2100
> > Powers Ferry Road SE, Suite 400, Atlanta, GA 30339, USA
> >
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang


Re: Kafka streams uneven task allocation

2022-02-23 Thread Guozhang Wang
Hello Navneeth,

Just to verify some behaviors, could you try 1) not using instance.id
config, hence no static members, 2) upgrade to the latest version of Kafka,
respectively (i.e. do not do them at the same time) and see if either one
of them help with the imbalance issue?

On Sun, Feb 20, 2022 at 2:17 AM Luke Chen  wrote:

> Hi Navneeth,
>
> To know the reason why there's more than one partition in the same stream
> task, we should know why the rebalance triggered.
> That might have to look into the logs.
>
> > I have configured standby to be 1 which means there will be
> one more copy of the state store and warm up by default is 2. What's the
> difference, will there be 2 copies now?
>
> You should have "at most" 3 copies now, which is 1 standby + 2 warmup when
> having enough Kafka stream instances.
>
> Thank you.
> Luke
>
> On Sat, Feb 19, 2022 at 2:18 PM Navneeth Krishnan <
> reachnavnee...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks and sorry for the late reply. I'm overriding the
> > GROUP_INSTANCE_ID_CONFIG
> > & APPLICATION_SERVER_CONFIG.
> > Rest all are defaults. Even then I see more than one partition being
> > allocated to the same stream task.
> >
> > Also I have an additional question regarding the replicas. The default
> > values for the configs num.standby.replicas & max.warmup.replicas are 0
> & 2
> > respectively. I have configured standby to be 1 which means there will be
> > one more copy of the state store and warm up by default is 2. What's the
> > difference, will there be 2 copies now?
> >
> > Thanks
> >
> > On Fri, Feb 4, 2022 at 1:13 AM Guozhang Wang  wrote:
> >
> > > Hello Navneeth,
> > >
> > > Could you describe how you ended up with more than one partition
> > > assigned to the same thread after certain rebalance(s)? Do you override
> > any
> > > default config values such as instance.id (for static consumer
> members),
> > > etc?
> > >
> > > Also I'd suggest upgrading to a newer version --- we just released
> 3.1.0
> > > --- since we've made many improvements / fix bugs around rebalances and
> > > assignment logic since 2.6.
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Feb 2, 2022 at 9:37 AM Navneeth Krishnan <
> > reachnavnee...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > We are facing an issue with our kafka streams application due to
> uneven
> > > > task allocation. There are 100 partitions in the input topic with 100
> > > > stream threads processing the data. Everything works well when each
> > task
> > > > gets assigned with 1 partition. But when more than one partition is
> > > > assigned to the same thread then it causes delay in processing
> causing
> > a
> > > > huge backlog.
> > > >
> > > > How is everyone handling this? This creates a huge impact to the
> > > > application SLA and we would like to minimize such cases. Any
> > suggestions
> > > > would be appreciated.
> > > >
> > > > Note: we are on version 2.6.2
> > > >
> > > > Thanks,
> > > > Navneeth
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


-- 
-- Guozhang


Re: Kafka streams and user authentication

2022-02-23 Thread Guozhang Wang
Hello Alessandro,

Could you elaborate a bit more on what authN methanisms you are using, and
by `account` what do you mean explicitly?


Guozhang

On Wed, Feb 23, 2022 at 5:10 AM Alessandro Ernesto Mascherpa <
alessandro.masche...@piksel.com> wrote:

> Hi All,
> I'm facing a problem with user authentication in Kafka streams in Kafka
> v.3.0.0.
>
> A Java application reads from topic A as a stream and, in the same stream,
> writes to topic B.
> The two topics are configured with two different accounts, hence is it
> feasible for a stream to read using an account and to write using another
> account? And if so, how should I configure the stream?
>
> I'm available for further information and discussion
> Thanks in advance
> Alessandro
>
> This message is private and confidential. If you have received this
> message in error, please notify the sender or serviced...@piksel.com and
> remove it from your system.
>
> Piksel Inc is a Delaware corporation, whose registered office is 2100
> Powers Ferry Road SE, Suite 400, Atlanta, GA 30339, USA
>


-- 
-- Guozhang


[ANNOUNCE] New committer: Luke Chen

2022-02-09 Thread Guozhang Wang
The PMC for Apache Kafka has invited Luke Chen (showuon) as a committer and
we are pleased to announce that he has accepted!

Luke has been actively contributing to Kafka since early 2020. He has
made more than 120 commits on various components of Kafka, with notable
contributions to the rebalance protocol in Consumer and Streams (KIP-766,
KIP-726, KIP-591, KAFKA-12675 and KAFKA12464, to just name a few), as well
as making an impact on improving test stability of the project. Aside from
all his code contributions, Luke has been a great participant in
discussions across the board, a very active and helpful reviewer of other
contributors' works, all of which are super valuable and highly appreciated
by the community.


Thanks for all of your contributions Luke. Congratulations!

-- Guozhang, on behalf of the Apache Kafka PMC


Re: What versions are supported by community?

2022-02-05 Thread Guozhang Wang
Hi,

There's a wiki page regarding Kafka's release policy:
https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan

And regarding the EOL: "Given 3 releases a year and the fact that no one
upgrades three times a year, we propose making sure (by testing!) that
rolling upgrade can be done from each release in the past year (i.e. last 3
releases) to the latest version. We will also attempt, as a community to do
bugfix releases as needed for the last 3 releases."

This is not exactly what you meant by "supported by the community", but in
general we only try to release bug fixes for the last 3 releases, hope this
can help with your planning.


Guozhang


On Fri, Feb 4, 2022 at 3:15 PM Israel Ekpo  wrote:

> To my knowledge, such policy/practice does not exist for the Apache Kafka
> project.
>
> From time to time certain environments and tool support like Java and Scala
> versions have been deprecated and dropped but o don’t think this applies to
> Kafka versions
>
> Bug and security fixes are typically applied to recent major and patch
> versions
>
> On Fri, Feb 4, 2022 at 3:52 PM Doug Whitfield 
> wrote:
>
> > I have found the Confluent EOL schedule, but I have not been able to find
> > the EOL schedule for Apache Kafka. Does such a policy exist?
> >
> > Best Regards,
> > --
> >
> > Doug Whitfield | Enterprise Architect, OpenLogic
> >
> >
> >
> >
> > This e-mail may contain information that is privileged or confidential.
> If
> > you are not the intended recipient, please delete the e-mail and any
> > attachments and notify us immediately.
> >
> > --
> Israel Ekpo
> Lead Instructor, IzzyAcademy.com
> https://www.youtube.com/c/izzyacademy
> https://izzyacademy.com/
>


-- 
-- Guozhang


Re: Error triming topology

2022-02-03 Thread Guozhang Wang
Hello Murilo,

Could you elaborate a bit more on how you "trimmed" the topology? For
example:

1) Did you change the code so that only 6 sub-topologies will be built, and
their sub-topology ids stays the same? i.e. you just trimmed the last 3
sub-topologies with id 6,7,8?
2) Did you delete the local state dir for those sub-topologies?
3) Did you delete all the repartition/changelog topics for those
sub-topologies?
4) For the remaining 6 sub-topologies, are their state store names and
topic names remains all the same? --- I understand that they have static
names, but do they have numerical suffices that get changed?

On Tue, Jan 25, 2022 at 6:43 PM Murilo Tavares  wrote:

> Hi
> I have a KafkaStreams application that is too heavyweight, with 9
> sub-topologies.
> I am trying to disable some unneeded part of the topology that is
> completely independent of the rest of the topology. Since my state stores
> have fixed, predictable names, I compared the topologies and I believe it
> should be safe to trim some sub-topologies.
> After trimming the unused ones, it now has 6 sub-topologies.
> Nevertheless, the application won't start. It seems to be trying to recover
> previous tasks, that shouldn't exist anymore.
> I have let the application down for 30 min so any timeouts, like session or
> polling timeouts could expire, but still, when the application starts, it
> reads the task states from somewhere and fails to recover it...
> Here's the log (note the "unknown task 7_0", which makes sense since the
> number of topologies felt from 9 to 6):
>
> 2022-01-26 02:28:17.552 [asdasdasd-StreamThread-1] INFO
>
>  
> org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor
> - Decided on assignment:
> {1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
> 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])
> prevActiveTasks: ([]) prevStandbyTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1,
> 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1, 7_0, 7_1, 8_0, 8_1, 9_0, 9_1])
> changelogOffsetTotalsByTask: ([0_0=1244818, 0_1=625988, 1_0=15255,
> 1_1=64645, 2_0=670938, 2_1=100636, 3_0=6379662, 3_1=5600072, 4_0=2362,
> 4_1=15224, 5_0=19577, 5_1=113994, 6_0=7403980, 6_1=9195079, 7_0=226722,
> 7_1=76623, 8_0=7334, 8_1=66344, 9_0=0, 9_1=39]) taskLagTotals: ([0_0=3,
> 0_1=3, 1_0=1, 1_1=1, 2_0=1, 2_1=1, 3_0=7, 3_1=7, 4_0=1, 4_1=1, 5_0=1,
> 5_1=1, 6_0=1, 6_1=1]) capacity: 1 assigned: 14]} with no followup probing
> rebalance.
> 2022-01-26 02:28:17.558 [asdasdasd-StreamThread-1] INFO
>  org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor -
> stream-thread [asdasdasd-StreamThread-1-consumer] Assigned tasks [0_0, 0_1,
> 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1] including
> stateful [0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0,
> 6_1] to clients as:
> 1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
> 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])].
> 2022-01-26 02:28:17.566 [asdasdasd-StreamThread-1] INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> [Consumer instanceId=asdasdasd-1,
> clientId=asdasdasd-StreamThread-1-consumer, groupId=inventory-assembler-4]
> Rebalance failed.
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 7_0
> at
>
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$534.F9BC5F20.applyAsLong(Unknown
> Source) ~[?:?]
> at java.util.Comparator.lambda$comparingLong$6043328a$1(Unknown Source)
> ~[?:?]
> at java.util.Comparator$$Lambda$535.F9B1D820.compare(Unknown
> Source) ~[?:?]
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Unknown Source)
> ~[?:?]
> at java.util.Comparator$$Lambda$399.FEBE6620.compare(Unknown
> Source) ~[?:?]
> at java.util.TreeMap.put(Unknown Source) ~[?:?]
> at java.util.TreeSet.add(Unknown Source) ~[?:?]
> at java.util.AbstractCollection.addAll(Unknown Source) ~[?:?]
> at java.util.TreeSet.addAll(Unknown Source) ~[?:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1205)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1119)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:845)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:405)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
> [app.jar:?]
> at
>
> 

Re: Kafka streams uneven task allocation

2022-02-03 Thread Guozhang Wang
Hello Navneeth,

Could you describe how you ended up with more than one partition
assigned to the same thread after certain rebalance(s)? Do you override any
default config values such as instance.id (for static consumer members),
etc?

Also I'd suggest upgrading to a newer version --- we just released 3.1.0
--- since we've made many improvements / fix bugs around rebalances and
assignment logic since 2.6.


Guozhang

On Wed, Feb 2, 2022 at 9:37 AM Navneeth Krishnan 
wrote:

> Hi All,
>
> We are facing an issue with our kafka streams application due to uneven
> task allocation. There are 100 partitions in the input topic with 100
> stream threads processing the data. Everything works well when each task
> gets assigned with 1 partition. But when more than one partition is
> assigned to the same thread then it causes delay in processing causing a
> huge backlog.
>
> How is everyone handling this? This creates a huge impact to the
> application SLA and we would like to minimize such cases. Any suggestions
> would be appreciated.
>
> Note: we are on version 2.6.2
>
> Thanks,
> Navneeth
>


-- 
-- Guozhang


Re: Kafka Streams - Stream threads processing two input topics

2022-01-10 Thread Guozhang Wang
Hi Miguel,

I suspect it's due to the timestamps in your topic A, which are earlier
than topic B. Note that Kafka Streams tries to synchronize joining topics
by processing records with smaller timestamps, and hence if topic A's
messages have smaller timestamps, they will be selected over the other.

The reason why through a repartition topic alleviates the problem is that,
the first topology would reset the timestamp on the repartition topics, to
some value more close to the processing time and is closer to topic B's
messages' timestamps.


Guozhang

On Mon, Jan 10, 2022 at 10:05 AM Miguel González 
wrote:

> Hello
>
> We are consuming two topics (A and B) and joining them, but I have noticed
> no matter what I do, topic A gets consumed first in a batch and then topic
> B , increasing *num.stream.threads* will only get topic A process a lot of
> records faster. Topic B has lots of messages compared to Topic A
>
>
> Here are my settings:
>
> Map config = new HashMap<>();
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, streamingAppName);
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
> config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.StringSerde.class);
> config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> StreamsConfig.EXACTLY_ONCE);
> config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
> config.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO");
> config.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
>
>
> For now we tried splitting the topology into two topologies to have one
> thread for each of the topics that somewhat has alleviated the problem,
>
> For the first topology with the topic A we have updated the settings like
> so to try to limit the amount of messages fetched, and since we do a
> repartition the second topology will join with that repartition topic
>
> properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
> streamingAppName + "-reKey-app");
> properties.setProperty(consumerPrefix(MAX_POLL_RECORDS_CONFIG), "20");
> properties.setProperty(consumerPrefix(FETCH_MAX_BYTES_CONFIG),
> Integer.toString(30 * 1024));
>
>
>
> Is there a better way?
>
> I was encouraged to use *max.task.idle.ms * to
> avoid one topic's processing going much faster than the other, but I'm not
> sure if that will help with my issue.
>
>
> I'm not really sure about StreamThreads and Tasks and how they work.
>
> Any help is appreciated.
>


-- 
-- Guozhang


Re: How to properly use a clean a TimestampedKeyValueStore

2022-01-10 Thread Guozhang Wang
Hi Miguel,

I checked your code and it seems fine to me, so I would not suspect
anything with your logic. The next thing I'd suggest to check if you have
many cases where the same key gets deleted and then re-inserted (you can
add some logging at the `put` and `delete` calls`).


Guozhang

On Tue, Jan 4, 2022 at 6:31 PM Miguel González 
wrote:

> Hello
>
> Here's my store definition and my transformer with the whole logic (I
> included the transformer with issues and another version (V2) of the
> transformer with some improvements that I believe are necessary)
>
> https://gist.github.com/magg/576bf3381c9c0501b9761b54e9d86375
>
> Thanks
> - Miguel
>
>
>
>
>
> On Tue, Jan 4, 2022 at 5:21 PM Guozhang Wang  wrote:
>
> > Hi Miguel,
> >
> > How is your kvStore being constructed? Could you paste the snippet of the
> > related construction code, as well as the related iterating / deletion
> code
> > here?
> >
> > On Tue, Jan 4, 2022 at 2:25 PM Matthias J. Sax  wrote:
> >
> > > Not 100% sure. From what you describe it should work as expected.
> > >
> > > It seems `delete()` does not delete the key from the store (ie,
> RocksDB)
> > > itself (for unknown reasons)?
> > >
> > > Are you closing all your iterators correctly? (More or less a wild
> guess
> > > at the moment.)
> > >
> > > Did you enable caching for the store? (Just to double check if it could
> > > be caching related or not.)
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 12/24/21 11:08 AM, Miguel González wrote:
> > > > Hello
> > > >
> > > > I'm using Kafka Streams and I have a transformer that uses
> > > > a TimestampedKeyValueStore, I have a punctuator that is in charge of
> > > > cleaning the store,
> > > >
> > > > Basically I'm iterating the store using kvStore.all() and deleting
> the
> > > keys
> > > > based on some logic with kvStore.delete(key);
> > > >
> > > > I'm seeing the changelog topic for the store grow unbounded, I'm
> seeing
> > > > many values with null for the same keys... I think those are called
> > > > tombstones right?  but the punctuator is constantly doing the same
> > thing
> > > > trying to delete the same keys.. I see more tombstones being
> inserted.
> > > >
> > > > Is this the expected behavior? If so, how can I correctly clean that
> > > store?
> > > >
> > > > thanks
> > > > - Miguel
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: How to properly use a clean a TimestampedKeyValueStore

2022-01-04 Thread Guozhang Wang
Hi Miguel,

How is your kvStore being constructed? Could you paste the snippet of the
related construction code, as well as the related iterating / deletion code
here?

On Tue, Jan 4, 2022 at 2:25 PM Matthias J. Sax  wrote:

> Not 100% sure. From what you describe it should work as expected.
>
> It seems `delete()` does not delete the key from the store (ie, RocksDB)
> itself (for unknown reasons)?
>
> Are you closing all your iterators correctly? (More or less a wild guess
> at the moment.)
>
> Did you enable caching for the store? (Just to double check if it could
> be caching related or not.)
>
>
> -Matthias
>
>
> On 12/24/21 11:08 AM, Miguel González wrote:
> > Hello
> >
> > I'm using Kafka Streams and I have a transformer that uses
> > a TimestampedKeyValueStore, I have a punctuator that is in charge of
> > cleaning the store,
> >
> > Basically I'm iterating the store using kvStore.all() and deleting the
> keys
> > based on some logic with kvStore.delete(key);
> >
> > I'm seeing the changelog topic for the store grow unbounded, I'm seeing
> > many values with null for the same keys... I think those are called
> > tombstones right?  but the punctuator is constantly doing the same thing
> > trying to delete the same keys.. I see more tombstones being inserted.
> >
> > Is this the expected behavior? If so, how can I correctly clean that
> store?
> >
> > thanks
> > - Miguel
> >
>


-- 
-- Guozhang


Re: Kafka Streams threads sometimes fail transaction and are fenced after broker restart

2021-12-21 Thread Guozhang Wang
Hello Pieter,

Thanks for bringing this to the community's attention. After reading your
description I suspect you're hitting this issue:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-588%3A+Allow+producers+to+recover+gracefully+from+transaction+timeouts

Basically today we did not try to distinguish two cases that are fatal and
recoverable, and just conservatively treat both of them as fatal, causing
e.g. Streams embedded producers to be shutdown and restarted and hence
causing rebalances.


Guozhang

On Tue, Dec 21, 2021 at 7:07 AM Pieter Hameete 
wrote:

> Hi all,
>
> After looking for an answer / some discussion on this matter on the
> community Slack and StackOverflow<
> https://stackoverflow.com/questions/70335773/kafka-streams-apps-threads-fail-transaction-and-are-fenced-and-restarted-after-k>
> this mailing list is my last hope :-)
>
> We are noticing that our streams apps threads sometimes fail their
> transaction and get fenced after a broker restart. After the broker has
> started up again the streams apps log either an
> InvalidProducerEpochException: Producer attempted to produce with an old
> epoch or a ProducedFencedException: There is a newer producer with the same
> transactionalId which fences the current one. After these exceptions the
> thread dies and gets restarted, which causes rebalancing and a delay in
> processing for the partitions assigned to that thread.
>
> Some more details on our setup:
>
>   1.  We use Kafka 2.8 (Confluent Platform 6.2) for Brokers and 2.8.1 for
> streams apps.
>   2.  To ensure smooth broker restarts we use controlled shutdown for our
> brokers, and restart them 1-by-1 while waiting for all partitions to be
> in-sync before restarting.
>   3.  We use three brokers, with min in-sync replicas set to 2. As far as
> I know this should facilitate broker restarts that don't affect clients
> given 2.
>   4.  The streams apps are configured with a group instance id and a
> session timeout that allows for smooth restarts of the streams apps.
>
> In the logs we noticate that during Broker shutdown the clients log
> NOT_LEADER_OR_FOLLOWER exceptions (this is to be expected when partition
> leadership is being migrated). Then we see heartbeats failing (expected
> because broker shutting down, group leadership is migrated). Then we see
> discovering of a new group coordinator (expected, but bounces a bit between
> the old and new leader which I didnt expect). Finally the app stabilizes
> with a new group coordinator.
>
> Then after the broker starts up again we see the clients log
> FETCH_SESSION_ID_NOT_FOUND exceptions for the starting broker. The starting
> broker is rediscovered as a transaction coordinator. Shortly after that the
> InvalidProducerEpochExceptions and ProducedFencedExceptions occur for some
> Streams app threads causing the thread fencing and restarting.
>
> What could be reason for this happening? My first guess would be that the
> starting broker is taking over a transaction coordinator before it has
> synced its transaction states with the in-sync brokers. This difference in
> transaction state could be a reason the starting broker disagrees on the
> current producer epoch and/or transactional ids.
>
> Does anyone with more knowledge on this topic have an idea what could be
> causing the exceptions? Or how we could get more information on what's
> going on here.
>
> Best regards and thank you in advance!
>
> Pieter Hameete
>


-- 
-- Guozhang


Re: Hi Team,

2021-11-17 Thread Guozhang Wang
Hello,

Several things need to be checked here:

1) Is your group id correct? Does that exist and it's version is
sufficiently new that it does not store metadata on ZK but on Kafka?
2) Is the bootstrap server list correct? Is that IP reachable.



On Wed, Nov 17, 2021 at 12:24 AM wan...@emrubik.com 
wrote:

> sorry ,  the picture is not displayed
>
>
>
> *From:* wan...@emrubik.com
> *Date:* 2021-11-17 16:10
> *To:* users 
> *Subject:* Hi Team,
> Hi Team,
>
> I  want obtain consumer group information through the Kafka JAVA API ,As
> shown in figure
>
>
>
> Properties properties = new Properties();
> properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
> "10.127.16.2:9092");
>
> try {
> String groupId = "gwdp-hdb-cq-t1";
> adminClient = AdminClient.create(properties);
> DescribeConsumerGroupsResult describeConsumerGroupsResult = 
> adminClient.describeConsumerGroups(Arrays.asList(groupId));
> Map> map = 
> describeConsumerGroupsResult.describedGroups();
>
>
> The ConsumerGroupDescription object does not get the information. how to get 
> it ?
>
> many thanks in advance
>
>
>

-- 
-- Guozhang


Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Guozhang Wang
Hello Pushkar,

I'm assuming you have the same Kafka version (2.5.1) at the Streams client
side here: in those old versions, Kafka Streams relies on the embedded
Producer clients to handle timeouts, which requires users to correctly
configure such values.

In newer version (2.8+) We have made Kafka Streams more robust to Server
side disconnects or soft failures that may cause timeouts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams.
So I'd suggest you upgrade to those versions, and see if those symptoms go
away.


Guozhang

On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole  wrote:

> Hi All,
>
> I am getting below issue in streams application. Kafka cluster is a 3
> broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
> at the same time when below exception occurred in streams application so I
> can relate below exception to those brokers restarts. However, what is
> worrying me is the streams application did not process any events after
> below exception. So the question is:
> 1. how can i make the streams application resilient to broker issues e.g.
> the producer underneath streams should have connected to another broker
> instance at the time 1 broker went down, but possible the 2nd broker went
> down immediately that's why it timed out
> 2. In general how does streams handle broker issue and when does it decide
> to connect to another broker instance in case one instance seems to be in
> error?
>
>
> {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
> processing processor thread -
>
> analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
> stream - task [0_5] Abort sending since an error caught with a previous
> record (timestamp 1635596258179) to topic analytics-incoming-feed due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch
> creation\nTimeout exception caught when sending record to topic
> analytics-incoming-feed. This might happen if the producer cannot send data
> to the Kafka cluster and thus, its internal buffer fills up. This can also
> happen if the broker is slow to respond, if the network connection to the
> broker was interrupted, or if similar circumstances arise. You can increase
> producer parameter `max.block.ms` to increase this
>
> timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> task [0_5] Abort sending since an error caught with a previous record
> (timestamp 1635596258179) to topic analytics-incoming-feed due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch
> creation\nTimeout exception caught when sending record to topic
> analytics-incoming-feed. This might happen if the producer cannot send data
> to the Kafka cluster and thus, its internal buffer fills up. This can also
> happen if the broker is slow to respond, if the network connection to the
> broker was interrupted, or if similar circumstances arise. You can increase
> producer parameter `max.block.ms` to increase this timeout.\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
>
> datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
>
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
> java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:12 ms has passed since batch creation\n"}
>


-- 
-- Guozhang


Re: Stream to KTable internals

2021-11-01 Thread Guozhang Wang
Hello Chad,

>From your earlier comment, you mentioned "In my scenario the records were
written to the KTable topic before the record was written to the KStream
topic." So I think Matthias and others have excluded this possibility while
trying to help investigate.

If only the matching records from KStream are returned via a single a
consumer poll call but not the other records from KTable, then you would
miss this matched join result.

Guozhang


On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler 
wrote:

> Thank you for your response and the links to the presentations.
>
>
> *However, this seems tobe orthogonal to your issue?*
>
> Yes. From what I see in the code it looks like you have a single consumer
> subscribed to multiple topics. Please correct me if I'm wrong.
>
>
> *By default, timestamp synchronization is disabled. Maybeenabling it would
> help?*
>
> We are using a timestamp extractor that returns 0. We did that because we
> were almost always missing joins on startup, and this seemed to be the only
> way to bootstrap enough records at startup to avoid the missed join. We
> found a post that said doing that would make the KTable act like the
> GlobalKTable at startup. So far this works great, we never miss a join on a
> startup. If I use "timestamp synchronization" do I have to remove the zero
> timestamp extractor? If I remove the zero timestamp extractor will
> timestamp synchronization take care of the missed join issue on startup?
>
> I'm guessing the issue here is that occasionally the poll request is not
> returning the matching record for the KTable side of the join before the
> task goes off and starts processing records. Later when we put the same
> record on the topic and the KTable has had a chance to load more records
> the join works and everything is good to go. Because of the way our system
> works no new status records have been written and so the new record joins
> against the correct status.
>
> Do you agree that the poll request is returning the KStream record but not
> returning the KTable record and therefore the join is getting missed? If
> you don't agree, what do you think is going on? Is there a way to prove
> this out?
>
> Thanks,
> Chad
>
> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax  wrote:
>
> > Yes, a StreamThread has one consumer. The number of StreamThreads per
> > instance is configurable via `num.stream.threads`. Partitions are
> > assigned to threads similar to consumer is a plain consumer group.
> >
> > It seems you run with the default of one thread per instance. As you
> > spin up 12 instances, it results in 12 threads for the application. As
> > you have 12 partitions, using more threads won't be useful as no
> > partitions are left for them to process.
> >
> > For a stream-table joins, there will be one task per "partition pair"
> > that computes the join for those partitions. So you get 12 tasks, and
> > each thread processes one task in your setup. Ie, a thread consumer is
> > reading data for both input topics.
> >
> > Pausing happens on a per-partition bases: for joins there is two buffers
> > per task (one for each input topic partition). It's possible that one
> > partition is paused while the other is processed. However, this seems to
> > be orthogonal to your issue?
> >
> > For a GlobalKTable, you get an additional GlobalThread that only reads
> > the data from the "global topic" to update the GlobalKTable. Semantics
> > of KStream-KTable and KStream-GlobalKTable joins are different: Cf
> >
> >
> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
> >
> > For the timestamp synchronization, you may checkout `max.task.idle.ms`
> > config. By default, timestamp synchronization is disabled. Maybe
> > enabling it would help?
> >
> > You may also check out slides 34-38:
> >
> >
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
> >
> > There is one corner case: if two records with the same timestamp come
> > it, it's not defined which one will be processed first.
> >
> > Hope this helps.
> >
> >
> > -Matthias
> >
> >
> > On 10/30/21 6:45 AM, Chad Preisler wrote:
> > > Yes, this helped. I have some additional questions.
> > >
> > > Does StreamThread have one consumer? (Looks like it, but just want to
> > > confirm)
> > > Is there a separate StreamThread for each topic including the KTable?
> > > If a KTable is a StreamThread and there is a  StreamTask for that
> KTable,
> > > could my buffer be getting filled up, and the mainConsumer for the
> KTable
> > > be getting paused? I see this code in StreamTask#addRecords.
> > >
> > > // if after adding these records, its partition queue's buffered size
> has
> > > been
> > >  // increased beyond the threshold, we can then pause the
> > > consumption for this partition
> > >  if (newQueueSize > maxBufferedSize) {
> > >  mainConsumer.pause(singleton(partition));
> > >  }
> > >
> > > Is there any 

Re: Neverending KafkaStreams rebalance

2021-10-20 Thread Guozhang Wang
Hello Murilo,

I think what you've discovered is a processing spike: i.e. each poll call
itself returning up to 1000 records, and then processing them may take
longer (if, say, they hit the p99 percentile latency due to the processing
logic etc). After your config change, it is less sensitive to such
processing latency pikes and hence would be less keen on dropping off the
group and hence triggering another rebalance.

After that, the "unstable assignment" message is a special rebalance
protocol such that when new tasks need to be migrated to a host which has
no state before, it would be first started as a "wamup" to bootstrap the
state and then migrate, but it's costs are that more rebalances will be
triggered; this is a normal scenario especially if you have just escaped
with a continuous rebalance storms that are due to rebalances and hence
cause a lot of states needed to be bootstrapped (and changing the configs
may exaggerate the situation actually), you can find more details about its
technical designs on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
and
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams.
For this, I'd suggest you to keep a relatively high value for
`restore.consumer.max.poll.records` while keep your current settings for
`main.consumer.max.poll.records` and see if that separation helps.


Guozhang

On Thu, Oct 14, 2021 at 9:05 AM Murilo Tavares  wrote:

> Hi Guozhang
> Thanks for your response. I'm on KafkaStreams 2.8.1.
> Since you asked, is KafkaStreams 3.0.0 compatible with a 2.4.1 broker?
>
> But I found the issue.
> TL;DR: I increased max.poll.interval.ms and decreased max.poll.records
> which
> fixed the problem.
>
> I noticed the StreamThread summary logs
> Processed X total records, ran X punctuators, and committed X total tasks
> since the last update
> every 3 minutes and so. Looking at the code, I noticed they should be
> printed after a polling loop is completed as long as at least 2 min have
> passed since the last summary.
>
> Then I noticed that sometimes these messages would have a larger interval,
> in around 10 min or so. So I got the conclusion that polling was taking too
> long.
> So I increased max.poll.interval.ms and decreased max.poll.records (which
> we btw had on a non-default value of 1000, rather than 500).
> That improved the problem, as the polling loops would not timeout very
> often until it caught up.
> The weird thing I noticed is that, even after caught up and no more polling
> timeouts, we still had rebalances for an extra 2h, with messages saying:
> Finished unstable assignment of tasks
> For some reason, tasks seemed to keep migrating for 2h even after the
> polling stopped timing out and the message backlog had been processed.
> But now the service is stable again.
>
> Thanks
> Murilo
>
>
>
> On Wed, 13 Oct 2021 at 14:27, Guozhang Wang  wrote:
>
> > Hi Murilo, which version of Kafka Streams are you using? Could you try
> the
> > latest 3.0.0 release if it is not yet on that version?
> >
> > On Wed, Oct 13, 2021 at 8:02 AM Murilo Tavares 
> > wrote:
> >
> > > Hi
> > > I have a large, stateful, KafkaStreams application that is on a never
> > > ending rebalance loop.
> > > I can see that Task restorations take a lng time (circa 30-45 min).
> > And
> > > after that I see this error.
> > > This is followed by tasks being suspended, and the instance re-joining
> > the
> > > group and a new rebalance is triggered.
> > > Any ideas on how to fix this?
> > >
> > > WARN org.apache.kafka.streams.processor.internals.StreamThread -
> stream-
> > > thread [inventory-streams-green-0-StreamThread-1] Detected that the
> > thread
> > > is being fenced. This implies that this thread missed a rebalance and
> > > dropped out of the consumer group. Will close out all assigned tasks
> and
> > > rejoin the consumer group.
> > > org.apache.kafka.streams.errors.TaskMigratedException: Consumer
> > committing
> > > offsets failed, indicating the corresponding thread is no longer part
> of
> > > the group; it means all tasks belonging to this thread should be
> > migrated.
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> > > (TaskManager.java:1141) ~[app.jar:?] at
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(
> > > TaskManager.java:541) ~[app.jar:?] at
> > >
> > >
>

Re: Neverending KafkaStreams rebalance

2021-10-13 Thread Guozhang Wang
Hi Murilo, which version of Kafka Streams are you using? Could you try the
latest 3.0.0 release if it is not yet on that version?

On Wed, Oct 13, 2021 at 8:02 AM Murilo Tavares  wrote:

> Hi
> I have a large, stateful, KafkaStreams application that is on a never
> ending rebalance loop.
> I can see that Task restorations take a lng time (circa 30-45 min). And
> after that I see this error.
> This is followed by tasks being suspended, and the instance re-joining the
> group and a new rebalance is triggered.
> Any ideas on how to fix this?
>
> WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-
> thread [inventory-streams-green-0-StreamThread-1] Detected that the thread
> is being fenced. This implies that this thread missed a rebalance and
> dropped out of the consumer group. Will close out all assigned tasks and
> rejoin the consumer group.
> org.apache.kafka.streams.errors.TaskMigratedException: Consumer committing
> offsets failed, indicating the corresponding thread is no longer part of
> the group; it means all tasks belonging to this thread should be migrated.
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> (TaskManager.java:1141) ~[app.jar:?] at
> org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(
> TaskManager.java:541) ~[app.jar:?] at
>
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked
> (StreamsRebalanceListener.java:95) ~[app.jar:?] at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked
> (ConsumerCoordinator.java:312) ~[app.jar:?] at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete
> (ConsumerCoordinator.java:408) ~[app.jar:?] at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded
> (AbstractCoordinator.java:449) ~[app.jar:?] at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup
> (AbstractCoordinator.java:365) ~[app.jar:?] at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:508) ~[app.jar:?] at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded
> (KafkaConsumer.java:1261) ~[app.jar:?] at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230
> ) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:1210) ~[app.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(
> StreamThread.java:925) ~[app.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(
> StreamThread.java:885) ~[app.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:720) [app.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:583) [app.jar:?] at
> org.apache.kafka.streams.processor.internals.StreamThread.run(
> StreamThread.java:556) [app.jar:?] Caused by:
> org.apache.kafka.clients.consumer.CommitFailedException: Offset commit
> cannot be completed since the consumer is not part of an active group for
> auto partition assignment; it is likely that the consumer was kicked out of
> the group. at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest
> (ConsumerCoordinator.java:1139) ~[app.jar:?] at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
> (ConsumerCoordinator.java:1004) ~[app.jar:?] at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> KafkaConsumer.java:1490) ~[app.jar:?] at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> KafkaConsumer.java:1438) ~[app.jar:?] at
>
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> (TaskManager.java:1139) ~[app.jar:?] ... 15 more
>


-- 
-- Guozhang


Re: [ANNOUNCE] Apache Kafka 3.0.0

2021-09-22 Thread Guozhang Wang
Kudos to Konstantine! Congrats to everyone.

On Tue, Sep 21, 2021 at 9:01 AM Konstantine Karantasis <
kkaranta...@apache.org> wrote:

> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 3.0.0
>
> It is a major release that includes many new features, including:
>
> * The deprecation of support for Java 8 and Scala 2.12.
> * Kafka Raft support for snapshots of the metadata topic and other
> improvements in the self-managed quorum.
> * Deprecation of message formats v0 and v1.
> * Stronger delivery guarantees for the Kafka producer enabled by default.
> * Optimizations in OffsetFetch and FindCoordinator requests.
> * More flexible MirrorMaker 2 configuration and deprecation of MirrorMaker
> 1.
> * Ability to restart a connector's tasks on a single call in Kafka Connect.
> * Connector log contexts and connector client overrides are now enabled by
> default.
> * Enhanced semantics for timestamp synchronization in Kafka Streams.
> * Revamped public API for Stream's TaskId.
> * Default serde becomes null in Kafka Streams and several other
> configuration changes.
>
> You may read a more detailed list of features in the 3.0.0 blog post:
> https://blogs.apache.org/kafka/
>
> All of the changes in this release can be found in the release notes:
> https://downloads.apache.org/kafka/3.0.0/RELEASE_NOTES.html
>
> You can download the source and binary release (Scala 2.12 and 2.13) from:
> https://kafka.apache.org/downloads#3.0.0
>
>
> ---
>
>
> Apache Kafka is a distributed streaming platform with four core APIs:
>
>
> ** The Producer API allows an application to publish a stream of records to
> one or more Kafka topics.
>
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
>
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an
> output stream to one or more output topics, effectively transforming the
> input streams to output streams.
>
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might
> capture every change to a table.
>
>
> With these APIs, Kafka can be used for two broad classes of application:
>
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
>
> ** Building real-time streaming applications that transform or react
> to the streams of data.
>
>
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>
> A big thank you for the following 141 authors and reviewers to this
> release!
>
> A. Sophie Blee-Goldman, Adil Houmadi, Akhilesh Dubey, Alec Thomas,
> Alexander Iskuskov, Almog Gavra, Alok Nikhil, Alok Thatikunta, Andrew Lee,
> Bill Bejeck, Boyang Chen, Bruno Cadonna, CHUN-HAO TANG, Cao Manh Dat, Cheng
> Tan, Chia-Ping Tsai, Chris Egerton, Colin P. McCabe, Cong Ding, Daniel
> Urban, Daniyar Yeralin, David Arthur, David Christle, David Jacot, David
> Mao, David Osvath, Davor Poldrugo, Dejan Stojadinović, Dhruvil Shah, Diego
> Erdody, Dong Lin, Dongjoon Hyun, Dániel Urbán, Edoardo Comar, Edwin Hobor,
> Eric Beaudet, Ewen Cheslack-Postava, Gardner Vickers, Gasparina Damien,
> Geordie, Greg Harris, Gunnar Morling, Guozhang Wang, Gwen (Chen) Shapira,
> Ignacio Acuña Frías, Igor Soarez, Ismael Juma, Israel Ekpo, Ivan Ponomarev,
> Ivan Yurchenko, Jason Gustafson, Jeff Kim, Jim Galasyn, Jim Hurne, JoelWee,
> John Gray, John Roesler, Jorge Esteban Quilcate Otoya, Josep Prat, José
> Armando García Sancio, Juan Gonzalez-Zurita, Jun Rao, Justin Mclean,
> Justine Olshan, Kahn Cheny, Kalpesh Patel, Kamal Chandraprakash,
> Konstantine Karantasis, Kowshik Prakasam, Leah Thomas, Lee Dongjin, Lev
> Zemlyanov, Liu Qiang, Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco
> Aurelio Lotz, Matthew de Detrich, Matthias J. Sax, Michael G. Noll, Michael
> Noll, Mickael Maison, Nathan Lincoln, Niket Goel, Nikhil Bhatia, Omnia G H
> Ibrahim, Peng Lei, Phil Hardwick, Rajini Sivaram, Randall Hauch, Rohan
> Desai, Rohit Deshpande, Rohit Sachan, Ron Dagostino, Ryan Dielhenn, Ryanne
> Dolan, Sanjana Kaundinya, Sarwar Bhuiyan, Satish Duggana, Scott Hendricks,
> Sergio Peña, Shao Yang Hong, Shay Elkin, Stanislav Vodetskyi, Sven Erik
> Knop, Tom Bentley, UnityLung, Uwe Eisele, Vahid Has

Re: Subscribe for mailing list

2021-09-16 Thread Guozhang Wang
Hello Bhavik,

You need to send to a separate mailing list to subscribe:
https://kafka.apache.org/contact

On Thu, Sep 16, 2021 at 8:48 AM Bhavik Ambani 
wrote:

> Subscribe for mailing list
>


-- 
-- Guozhang


Re: High disk read with Kafka streams

2021-08-17 Thread Guozhang Wang
Hello Magnat,

Thanks for reporting your observations. I have some questions:

1) Are your global state stores also in-memory or persisted on disks?
2) Are your Kafka and KStreams colocated?


Guozhang

On Tue, Aug 10, 2021 at 6:10 AM mangat rai  wrote:

> Hey All,
>
> We are using the low level processor API to create kafka stream
> applications. Each app has 1 or more in-memory state stores with caching
> disabled and changelog enabled. Some of the apps also have global stores.
> We noticed from the node metrics (kubernetes) that the stream applications
> are consuming too much disk IO. On going deeper I found following
>
> 1. Running locally with docker I could see some pretty high disk reads. I
> used `docker stats` and got `BLOCK I/O` as `438MB / 0B`. To compare we did
> only a few gigabytes of Net I/O.
> 2. In kubernetes, `container_fs_reads_bytes_total` gives us pretty big
> numbers whereas `container_fs_writes_bytes_total` is almost negligible.
>
> Now we are *not* using RocksDB. The pattern is not correlated to having a
> global store. I read various documents but I still can't figure out why a
> stream application would perform so much disk read. It's not even writing
> so that rules out the swap space or any buffering etc.
>
> I also noticed that a higher amount of data consumption is directly
> proportional to a higher amount of disk reads. Is it possible that the data
> is zero copied from the network interface to the disk and Kafka app is
> reading from it. I am not aware if there is any mechanism to do that.
>
> I would really appreciate any help in debugging this issue.
>
> Thanks,
> Mangat
>


-- 
-- Guozhang


Re: DescribeTopics could return deleted topic

2021-08-17 Thread Guozhang Wang
Since DescribeTopics is just handled via a Metadata request behind the
scene, it is possible that if the request is sent to some brokers with
stale metadata (not yet received the metadata update request). But I would
not expect it to "always" return the deleted topic partition, unless the
broker being queried is partitioned from the controller and hence would
never receive the metadata update request.


Guozhang

On Sun, Aug 8, 2021 at 6:08 PM Boyang Chen 
wrote:

> Hey there,
>
> Has anyone experienced the case where the admin delete topic command was
> issued but the DescribeTopics command always returns the topic partition?
> What's the expected time for the topic metadata to disappear?
>
> Boyang
>


-- 
-- Guozhang


Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

2021-06-27 Thread Guozhang Wang
Hello Nitay,

I have not heard someone else reporting similar things that may point to a
bug still.. Maybe you could try to reproduce the issue by first starting a
brand new app in 2.5, and then follow the upgrade path (with config
overrides) to 2.6 and see if it is easily reproducible, and if yes create a
JIRA ticket?

Guozhang

On Wed, Jun 23, 2021 at 3:50 PM Nitay Kufert  wrote:

> Bumping for the off chance that during this time some sort of a bug was
> reported that might explain this behaviour..  i will feel more comfortable
> bumping our kafka versions this way :)
>
> On Wed, Feb 24, 2021 at 12:48 PM Nitay Kufert  wrote:
>
> > I guess it's possible but very unlikely because it works perfectly with
> > all the previous versions and the current one? (2.5.1)
> > Why did a change in the version introduce NULLS there?
> >
> > On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang 
> wrote:
> >
> >> Is it possible that the flattened values contain `null` and hence
> >> `_.split`
> >> throws?
> >>
> >> On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert 
> wrote:
> >>
> >> > Hey, missed your replay - but the code i've shared above the logs is
> the
> >> > code around those lines (removed some identifiers to make it a little
> >> bit
> >> > more generic):
> >> >
> >> > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
> >> > > ${v.printForDebug}")}) # return type KStream[Windowed[String],
> >> > > SingleInputMessage]
> >> >
> >> >
> >> > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang 
> >> wrote:
> >> >
> >> > > Could you share your code around
> >> > >
> >> > > >
> >> > >
> >> > >
> >> >
> >>
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >> > >
> >> > > That seems to be where NPE is thrown.
> >> > >
> >> > >
> >> > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert 
> >> > wrote:
> >> > >
> >> > > > Hey,
> >> > > > *Without any code change*, just by bumping the kafka version from
> >> 2.5.1
> >> > > to
> >> > > > 2.6.1 (clients only) - my stream application started throwing
> >> > > > NullPointerException (sometimes, not in a predicted pattern).
> >> > > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM"
> >> conf
> >> > > > that was forgotten there from the older versions.
> >> > > >
> >> > > > We are using Scala 2.12, and the line that throws this exception
> is
> >> > using
> >> > > > flatMapValues:
> >> > > >
> >> > > >
> >> > > > >  inputStream.flatMapValues(_.split) # return type
> >> > > > > KStream[Windowed[String], SingleInputMessage]
> >> > > >
> >> > > >
> >> > > > Where inputStream is of type: KStream[Windowed[String],
> >> InputMessage]
> >> > and
> >> > > > the split method splits this InputMessage into several
> >> > > > SingleInputMessage messages (hence the flat - to avoid
> >> > > > List[SingleInputMessage]).
> >> > > >
> >> > > > The exception:
> >> > > >
> >> > > > > java.lang.NullPointerException: null Wrapped by:
> >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
> >> caught in
> >> > > > > process. taskId=2_2,
> >> > processor=unique_input_message-repartition-source,
> >> > > > > topic=service-unique_input_message-repartition, partition=2,
> >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException
> >> > > > >
> >> > > >
> >> > > > java.lang.NullPointerException: null at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache

Re: Add me to contributors list of Apache Kafka

2021-06-27 Thread Guozhang Wang
Hello Alan,

I've added you to the contributors list and also assigned the ticket to you.

Cheers,
Guozhang

On Fri, Jun 25, 2021 at 5:19 PM Alan Artigao  wrote:

> Hi there!
>
> I'm working on KAFKA-12995
>  and I need to be in
> the contributors list of Apache Kafka in order to assign it to me.
>
> Username in JIRA is: *aartigao*
>
> Cheers!
>


-- 
-- Guozhang


Re: KStreams and multiple instance

2021-05-11 Thread Guozhang Wang
Hello Pietro,

1) If you are using the Streams DSL with an aggregation, it would
repartition the input streams by the aggregation field for data
parallelism, and hence multiple instances would be able to do the
aggregation in parallel and independently with correct results.
2) Short answer is "probably": you can use different state store impls for
materializing the running aggregates.

You can read some general docs for this if you've further questions:
https://kafka.apache.org/28/documentation/streams/architecture


Guozhang

On Thu, May 6, 2021 at 8:40 AM Pietro Galassi 
wrote:

> Hi all,
> hi have hope you can help me figure out this scenario.
>
> I have a multiinstance microservice that consumes from a topic
> (ordersTopic) all of them use the same consumer_group.
>
> This microservice uses a KStream to aggregate (sum) topic events and
> produces results on another topic (countTopic).
>
> Have two questions:
>
> 1) Can i have problems on counts due to multiple instance of the same
> microservies ?
> 2) I need rockDB and materialized view in order to store data ?
>
> Thanks a lot.
> Regards,
> Pietro Galassi
>


-- 
-- Guozhang


Re: Kafka Stream: State replication seems unpredictable.

2021-05-03 Thread Guozhang Wang
Hello Mohan,

Sorry for getting late on the thread. Just to revive your concerns here: if
in your topology there's no output at all to any topics (sink topics,
changelog topics), then yes the zombie would not be detected; but on the
other hand the topology itself is not make any visible changes to the
external systems anyways -- you can just think of a zombie who's keeping
doing redundant work and then drop the results on the floor since nothing
was reflected outside.

On the other hand, if the tasks are at least writing to some sink topics,
then zombies would still be detected.


Guozhang

On Thu, Apr 22, 2021 at 10:47 AM Parthasarathy, Mohan 
wrote:

> Guozhang,
>
> What does this mean if the changelog topic was disabled ? If thread 2 and
> thread 4 are running in two different nodes and a rebalance occurs, thread
> 2 will not realize it is a zombie without the write to the changelog topic,
> right ? I am trying to understand the cases under which the changelog topic
> can ever be disabled.
>
> Thanks
> Mohan
>
>
> On 4/21/21, 10:22 PM, "Guozhang Wang"  wrote:
>
> Hello Mangat,
>
> I think using persistent store that relies on in-memory stores could
> help
> if the threads are from the same instance.
>
> Guozhang
>
> On Tue, Apr 20, 2021 at 12:54 AM mangat rai 
> wrote:
>
> > Hey Guozhang,
> >
> > Thanks for creating the issue. Yes, you are right, this will happen
> only
> > with the consecutive rebalancing as after some time zombie thread
> will stop
> > and re-join the group and the new thread will always overwrite the
> state
> > with the latest data. In our poor infra setup, the rebalancing was
> > happening many times in a row.
> >
> > Now, we can't guarantee that the consecutive rebalancing will not
> happen
> > again (we reduced fetch-size which fixed it in many ways), will any
> of the
> > following work as a workaround?
> >
> > 1. Use persistent store instead of in-memory. The new thread will
> never get
> > the lock hence we will lose availability but keep things consistent.
> > 2. Use exactly-once semantics. However, we might need to redesign
> our apps.
> > It's a bigger change.
> >
> > Regards,
> > Mangat
> >
> > On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang 
> wrote:
> >
> > > Hello Mangat,
> > >
> > > What you've encountered is a "zombie writer" issue, that is,
> Thread-2 did
> > > not know there's already a new rebalance and hence its partitions
> have
> > been
> > > migrated out, until it tries to commit and then got notified of the
> > > illegal-generation error and realize itself is the "zombie"
> already. This
> > > case would still persist even with incremental rebalancing.
> > >
> > > I've filed https://issues.apache.org/jira/browse/KAFKA-12693  to
> > summarize
> > > the situation. Please LMK if that explanation is clear to you.
> > >
> > > On Mon, Apr 19, 2021 at 12:58 AM mangat rai 
> > wrote:
> > >
> > > > Thanks, Guozhang,
> > > >
> > > > I was knocking myself with Kafka's various consumer rebalancing
> > > algorithms
> > > > in the last 2 days. Could I generalize this problem as
> > > >
> > > >
> > > >
> > > > *Any in-memory state store backed by a changelog topic will
> always risk
> > > > having interleaved writes from two different writers during
> > rebalancing?*
> > > > In our case, CPU throttling made it worse as thread-2 didn't try
> to
> > > commit
> > > > for a long time. Also,
> > > >
> > > > 1. Do you think if we disable the incremental rebalancing, we
> will not
> > > have
> > > > this issue because If I understood correctly Thread-4 will not
> start
> > > > processing until the state is completely transferred from
> Thread-2.
> > > > 2. If yes, how can we disable it without downgrading the client?
> > > >
> > > > Since we have a very low scale and no real-time computing
> requirement,
> > we
> > > > will be happy to sacrifice the availability to have consistency.
> > > >
> > > > Regards,
> > > > Mangat
> > > >
> > > >
> > > &

Re: Exceptions in kafka streams

2021-05-03 Thread Guozhang Wang
Hello DInesh,

Seems you're still using version 1.0 of Kafka Streams with EOS enabled.
Could you try to upgrade to a newer version (2.6+) and see if this issue
goes away?

On Mon, May 3, 2021 at 8:55 AM Dinesh Raj  wrote:

> Hi,
>
> I am getting too many exceptions whenever the kafka streams application is
> scaled out or scaled down.
>
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> > attempted to produce with an old epoch.
> > Written offsets would not be recorded and no more records would be sent
> > since the producer is fenced, indicating the task may be migrated out; it
> > means all tasks belonging to this thread should be migrated.
> > at
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:206)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:187)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1366)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:690)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:568)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$0(Sender.java:757)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> > ~[RTBKafkaStreams-1.0-SNAPSHOT-all.jar:?]
> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131]
> > Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException:
> > Producer attempted to produce with an old epoch.
> > 13:35:23.343
> >
> [bids_kafka_streams_beta_0001-a1ab0588-b1c2-4cd9-b02a-ccfed7594237-StreamThread-15]
> > ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> > [Consumer
> >
> clientId=bids_kafka_streams_beta_0001-a1ab0588-b1c2-4cd9-b02a-ccfed7594237-StreamThread-15-consumer,
> > groupId=bids_kafka_streams_beta_0001] LeaveGroup request with
> > Generation{generationId=169,
> >
> memberId='bids_kafka_streams_beta_0001-a1ab0588-b1c2-4cd9-b02a-ccfed7594237-StreamThread-15-consumer-bc1dee7e-dad7-4eff-a251-6690dedb4493',
> > protocol='stream'} failed with error: The coordinator is not aware of
> this
> > member.
> >
>
> I think this exception is not fatal as the topology keeps on running but
> surely it's slowed down. And it never settles, these exceptions keep coming
> even after half an hour. My understanding is that once rebalancing is
> completed and tasks are migrated, I should stop getting this exception.
> Please help.
>
> Thanks & Regards,
> Dinesh Raj
>


-- 
-- Guozhang


Re: Kafka Stream: State replication seems unpredictable.

2021-04-21 Thread Guozhang Wang
Hello Mangat,

I think using persistent store that relies on in-memory stores could help
if the threads are from the same instance.

Guozhang

On Tue, Apr 20, 2021 at 12:54 AM mangat rai  wrote:

> Hey Guozhang,
>
> Thanks for creating the issue. Yes, you are right, this will happen only
> with the consecutive rebalancing as after some time zombie thread will stop
> and re-join the group and the new thread will always overwrite the state
> with the latest data. In our poor infra setup, the rebalancing was
> happening many times in a row.
>
> Now, we can't guarantee that the consecutive rebalancing will not happen
> again (we reduced fetch-size which fixed it in many ways), will any of the
> following work as a workaround?
>
> 1. Use persistent store instead of in-memory. The new thread will never get
> the lock hence we will lose availability but keep things consistent.
> 2. Use exactly-once semantics. However, we might need to redesign our apps.
> It's a bigger change.
>
> Regards,
> Mangat
>
> On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang  wrote:
>
> > Hello Mangat,
> >
> > What you've encountered is a "zombie writer" issue, that is, Thread-2 did
> > not know there's already a new rebalance and hence its partitions have
> been
> > migrated out, until it tries to commit and then got notified of the
> > illegal-generation error and realize itself is the "zombie" already. This
> > case would still persist even with incremental rebalancing.
> >
> > I've filed https://issues.apache.org/jira/browse/KAFKA-12693 to
> summarize
> > the situation. Please LMK if that explanation is clear to you.
> >
> > On Mon, Apr 19, 2021 at 12:58 AM mangat rai 
> wrote:
> >
> > > Thanks, Guozhang,
> > >
> > > I was knocking myself with Kafka's various consumer rebalancing
> > algorithms
> > > in the last 2 days. Could I generalize this problem as
> > >
> > >
> > >
> > > *Any in-memory state store backed by a changelog topic will always risk
> > > having interleaved writes from two different writers during
> rebalancing?*
> > > In our case, CPU throttling made it worse as thread-2 didn't try to
> > commit
> > > for a long time. Also,
> > >
> > > 1. Do you think if we disable the incremental rebalancing, we will not
> > have
> > > this issue because If I understood correctly Thread-4 will not start
> > > processing until the state is completely transferred from Thread-2.
> > > 2. If yes, how can we disable it without downgrading the client?
> > >
> > > Since we have a very low scale and no real-time computing requirement,
> we
> > > will be happy to sacrifice the availability to have consistency.
> > >
> > > Regards,
> > > Mangat
> > >
> > >
> > >
> > > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang 
> > wrote:
> > >
> > > > Hi Mangat:
> > > >
> > > > I think I found the issue of your problem here.
> > > >
> > > > It seems thread-2's partition was assigned to thread-4 while thread-2
> > was
> > > > not aware (because it missed a rebalance, this is normal scenario);
> in
> > > > other words, thread2 becomes a "zombie". It would stay in that zombie
> > > state
> > > > until it tried to commit, in which it would get an error from the
> > brokers
> > > > and realize its zombie identity and re-joins the group.
> > > >
> > > > During that period of time, before the commit was issued, it would
> > > continue
> > > > trying to write to its local states; here are several scenarios:
> > > >
> > > > 1) if thread-2/4 are belonging to two different nodes then that is
> > fine,
> > > > since they will write to different local state stores.
> > > > 2) if they belong to the same nodes, and
> > > >a) the state stores are persistent then they would have risks of
> > > > contention; this is guarded by the state directory locks (as file
> > locks)
> > > in
> > > > which case the new owner thread-4 should not be able to get on the
> > local
> > > > state files.
> > > >b) the state stores are in-memory, in which case that is fine
> since
> > > the
> > > > in-memory stores are kept separate as well.
> > > >
> > > > In your case: 2.b), the issue is that the changelog would still be
> > shared
> > > > between the two

Re: Kafka Stream: State replication seems unpredictable.

2021-04-19 Thread Guozhang Wang
Hello Mangat,

What you've encountered is a "zombie writer" issue, that is, Thread-2 did
not know there's already a new rebalance and hence its partitions have been
migrated out, until it tries to commit and then got notified of the
illegal-generation error and realize itself is the "zombie" already. This
case would still persist even with incremental rebalancing.

I've filed https://issues.apache.org/jira/browse/KAFKA-12693 to summarize
the situation. Please LMK if that explanation is clear to you.

On Mon, Apr 19, 2021 at 12:58 AM mangat rai  wrote:

> Thanks, Guozhang,
>
> I was knocking myself with Kafka's various consumer rebalancing algorithms
> in the last 2 days. Could I generalize this problem as
>
>
>
> *Any in-memory state store backed by a changelog topic will always risk
> having interleaved writes from two different writers during rebalancing?*
> In our case, CPU throttling made it worse as thread-2 didn't try to commit
> for a long time. Also,
>
> 1. Do you think if we disable the incremental rebalancing, we will not have
> this issue because If I understood correctly Thread-4 will not start
> processing until the state is completely transferred from Thread-2.
> 2. If yes, how can we disable it without downgrading the client?
>
> Since we have a very low scale and no real-time computing requirement, we
> will be happy to sacrifice the availability to have consistency.
>
> Regards,
> Mangat
>
>
>
> On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang  wrote:
>
> > Hi Mangat:
> >
> > I think I found the issue of your problem here.
> >
> > It seems thread-2's partition was assigned to thread-4 while thread-2 was
> > not aware (because it missed a rebalance, this is normal scenario); in
> > other words, thread2 becomes a "zombie". It would stay in that zombie
> state
> > until it tried to commit, in which it would get an error from the brokers
> > and realize its zombie identity and re-joins the group.
> >
> > During that period of time, before the commit was issued, it would
> continue
> > trying to write to its local states; here are several scenarios:
> >
> > 1) if thread-2/4 are belonging to two different nodes then that is fine,
> > since they will write to different local state stores.
> > 2) if they belong to the same nodes, and
> >a) the state stores are persistent then they would have risks of
> > contention; this is guarded by the state directory locks (as file locks)
> in
> > which case the new owner thread-4 should not be able to get on the local
> > state files.
> >b) the state stores are in-memory, in which case that is fine since
> the
> > in-memory stores are kept separate as well.
> >
> > In your case: 2.b), the issue is that the changelog would still be shared
> > between the two --- but note that this is the same case as in case 1) as
> > well. And this means at that time the changelog is shared by two writers
> > sending records interleaving. And if there’s a tombstone that was
> intended
> > for a record A, but when it was written interleaving and there’s another
> > record B in between, that tombstone would effectively delete record B.
> The
> > key here is that, when we replay the changelogs, we replay it completely
> > following offset ordering.
> >
> >
> >
> > On Thu, Apr 15, 2021 at 2:28 AM mangat rai  wrote:
> >
> > > Guozhang,
> > >
> > > Yes, you are correct. We have our own group processor. I have more
> > > information now.
> > >
> > > 1. I added ThreadId in the data when the app persists into the
> changelog
> > > topic.
> > > 2. Thread-2 which was working with partition-0 had a timeout issue.
> > > 4. Thread-4 picked up this partition-0 as I can see its Id in the
> > > changelog.
> > > 5. *But then Thread-2 and Thread-4 both were writing into the
> partition-0
> > > of the changelog, that too for the same key.*
> > >
> > > So I was clearly able to see that two threads were overwriting data of
> > one
> > > another into the state store leading to a corrupted state. This
> confirms
> > my
> > > theory that it was an issue of concurrent update. This was something
> > > totally unexpected. I suspect that Thread-2 continues to persist its
> > > in-memory state, maybe because It wasn't stopped after the timeout
> > > exception. Is there a configuration possible in the Kafka stream which
> > > could lead to this?
> > >
> > > There was no network issue, our CPU was highly throttled by Kubernetes.
&

Re: [ANNOUNCE] Apache Kafka 2.8.0

2021-04-19 Thread Guozhang Wang
This is great! Thanks to everyone who has contributed to the release.

On Mon, Apr 19, 2021 at 9:36 AM John Roesler  wrote:

> The Apache Kafka community is pleased to announce the
> release for Apache Kafka 2.8.0
>
> Kafka 2.8.0 includes a number of significant new features.
> Here is a summary of some notable changes:
>
> * Early access of replace ZooKeeper with a self-managed
> quorum
> * Add Describe Cluster API
> * Support mutual TLS authentication on SASL_SSL listeners
> * JSON request/response debug logs
> * Limit broker connection creation rate
> * Topic identifiers
> * Expose task configurations in Connect REST API
> * Update Streams FSM to clarify ERROR state meaning
> * Extend StreamJoined to allow more store configs
> * More convenient TopologyTestDriver construtors
> * Introduce Kafka-Streams-specific uncaught exception
> handler
> * API to start and shut down Streams threads
> * Improve TimeWindowedDeserializer and TimeWindowedSerde to
> handle window size
> * Improve timeouts and retries in Kafka Streams
>
> All of the changes in this release can be found in the
> release notes:
> https://www.apache.org/dist/kafka/2.8.0/RELEASE_NOTES.html
>
>
> You can download the source and binary release (Scala 2.12
> and 2.13) from:
> https://kafka.apache.org/downloads#2.8.0
>
> 
> ---
>
>
> Apache Kafka is a distributed streaming platform with four
> core APIs:
>
>
> ** The Producer API allows an application to publish a
> stream records to one or more Kafka topics.
>
> ** The Consumer API allows an application to subscribe to
> one or more topics and process the stream of records
> produced to them.
>
> ** The Streams API allows an application to act as a stream
> processor, consuming an input stream from one or more topics
> and producing an output stream to one or more output topics,
> effectively transforming the input streams to output
> streams.
>
> ** The Connector API allows building and running reusable
> producers or consumers that connect Kafka topics to existing
> applications or data systems. For example, a connector to a
> relational database might capture every change to a table.
>
>
> With these APIs, Kafka can be used for two broad classes of
> application:
>
> ** Building real-time streaming data pipelines that reliably
> get data between systems or applications.
>
> ** Building real-time streaming applications that transform
> or react to the streams of data.
>
>
> Apache Kafka is in use at large and small companies
> worldwide, including Capital One, Goldman Sachs, ING,
> LinkedIn, Netflix, Pinterest, Rabobank, Target, The New York
> Times, Uber, Yelp, and Zalando, among others.
>
> A big thank you for the following 128 contributors to this
> release!
>
> 17hao, abc863377, Adem Efe Gencer, Alexander Iskuskov, Alok
> Nikhil, Anastasia Vela, Andrew Lee, Andrey Bozhko, Andrey
> Falko, Andy Coates, Andy Wilkinson, Ankit Kumar, APaMio,
> Arjun Satish, ArunParthiban-ST, A. Sophie Blee-Goldman,
> Attila Sasvari, Benoit Maggi, bertber, bill, Bill Bejeck,
> Bob Barrett, Boyang Chen, Brajesh Kumar, Bruno Cadonna,
> Cheng Tan, Chia-Ping Tsai, Chris Egerton, CHUN-HAO TANG,
> Colin Patrick McCabe, Colin P. Mccabe, Cyrus Vafadari, David
> Arthur, David Jacot, David Mao, dengziming, Dhruvil Shah,
> Dima Reznik, Dongjoon Hyun, Dongxu Wang, Emre Hasegeli,
> feyman2016, fml2, Gardner Vickers, Geordie, Govinda Sakhare,
> Greg Harris, Guozhang Wang, Gwen Shapira, Hamza Slama,
> high.lee, huxi, Igor Soarez, Ilya Ganelin, Ismael Juma, Ivan
> Ponomarev, Ivan Yurchenko, jackyoh, James Cheng, James
> Yuzawa, Jason Gustafson, Jesse Gorzinski, Jim Galasyn, John
> Roesler, Jorge Esteban Quilcate Otoya, José Armando García
> Sancio, Julien Chanaud, Julien Jean Paul Sirocchi, Justine
> Olshan, Kengo Seki, Kowshik Prakasam, leah, Lee Dongjin,
> Levani Kokhreidze, Lev Zemlyanov, Liju John, Lincong Li,
> Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco Aurelio
> Lotz, mathieu, Matthew Wong, Matthias J. Sax, Matthias
> Merdes, Michael Bingham, Michael G. Noll, Mickael Maison,
> Montyleo, mowczare, Nikolay, Nikolay Izhikov, Ning Zhang,
> Nitesh Mor, Okada Haruki, panguncle, parafiend, Patrick
> Dignan, Prateek Agarwal, Prithvi, Rajini Sivaram, Raman
> Verma, Ramesh Krishnan M, Randall Hauch, Richard
> Fussenegger, Rohan, Rohit Deshpande, Ron Dagostino, Samuel
> Cantero, Sanket Fajage, Scott Hendricks, Shao Yang Hong,
> ssugar, Stanislav Kozlovski, Stanislav Vodetskyi, tang7526,
> Thorsten Hake, Tom Bentley, vamossagar12, Viktor Somogyi-
> Vass, voffcheg109, Walker Carlson, wenbingshen, wycc,
> xakassi, Xavier Léauté, Yilong Chang, zhangyue19921010
>
> We welcome your help and feedback. For more information on
> how to report problems, and to get involved, visit the
> project website at https://kafka.apache.org/
>
> Thank you!
>
>
> Regards,
>
> John Roesler
>
>

-- 
-- Guozhang


Re: Kafka Stream: State replication seems unpredictable.

2021-04-16 Thread Guozhang Wang
Hi Mangat:

I think I found the issue of your problem here.

It seems thread-2's partition was assigned to thread-4 while thread-2 was
not aware (because it missed a rebalance, this is normal scenario); in
other words, thread2 becomes a "zombie". It would stay in that zombie state
until it tried to commit, in which it would get an error from the brokers
and realize its zombie identity and re-joins the group.

During that period of time, before the commit was issued, it would continue
trying to write to its local states; here are several scenarios:

1) if thread-2/4 are belonging to two different nodes then that is fine,
since they will write to different local state stores.
2) if they belong to the same nodes, and
   a) the state stores are persistent then they would have risks of
contention; this is guarded by the state directory locks (as file locks) in
which case the new owner thread-4 should not be able to get on the local
state files.
   b) the state stores are in-memory, in which case that is fine since the
in-memory stores are kept separate as well.

In your case: 2.b), the issue is that the changelog would still be shared
between the two --- but note that this is the same case as in case 1) as
well. And this means at that time the changelog is shared by two writers
sending records interleaving. And if there’s a tombstone that was intended
for a record A, but when it was written interleaving and there’s another
record B in between, that tombstone would effectively delete record B. The
key here is that, when we replay the changelogs, we replay it completely
following offset ordering.



On Thu, Apr 15, 2021 at 2:28 AM mangat rai  wrote:

> Guozhang,
>
> Yes, you are correct. We have our own group processor. I have more
> information now.
>
> 1. I added ThreadId in the data when the app persists into the changelog
> topic.
> 2. Thread-2 which was working with partition-0 had a timeout issue.
> 4. Thread-4 picked up this partition-0 as I can see its Id in the
> changelog.
> 5. *But then Thread-2 and Thread-4 both were writing into the partition-0
> of the changelog, that too for the same key.*
>
> So I was clearly able to see that two threads were overwriting data of one
> another into the state store leading to a corrupted state. This confirms my
> theory that it was an issue of concurrent update. This was something
> totally unexpected. I suspect that Thread-2 continues to persist its
> in-memory state, maybe because It wasn't stopped after the timeout
> exception. Is there a configuration possible in the Kafka stream which
> could lead to this?
>
> There was no network issue, our CPU was highly throttled by Kubernetes. We
> gave more resources, also decreased the fetch-size so we have more I/O to
> Cpu time ratio than before, and then there was no timeout issue, hence no
> reassignment and hence no corrupted state.
>
> I really appreciate your help here...
> Thanks!
> Mangat
>
>
> On Wed, Apr 14, 2021 at 8:48 PM Guozhang Wang  wrote:
>
> > Hey Mangat,
> >
> > A. With at least once, Streams does not make sure atomicity of 1) / 2);
> > with exactly once, atomicity is indeed guaranteed with transactional
> > messaging.
> >
> > B. If you are using processor API, then I'm assuming you did your own
> > group-by processor right? In that case, the partition key would just be
> the
> > record key when you are sending to the repartition topic.
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Apr 8, 2021 at 9:00 AM mangat rai  wrote:
> >
> > > Thanks again, that makes things clear. I still have some questions here
> > > then.
> > >
> > > A.  For each record we read, we do two updates
> > >   1. Changelog topic of the state store.
> > >   2. Output topic aka sink.
> > >   Does the Kafka stream app make sure that either both are
> committed
> > or
> > > neither?
> > >
> > > B.  Out Input topic actually has the as (a,b,c), but we partition with
> > only
> > > (a). We do this because we have different compaction requirements than
> > the
> > > partitions. It will still work as all (a,b,c) records will go to the
> same
> > > partition. Now in aggregation, we group by (a,b,c). In such case what
> > will
> > > be the partition key for the changelog topic?
> > >
> > > Note that we use low-level processor API and don't commit ourselves.
> > >
> > > Regards,
> > > Mangat
> > >
> > >
> > >
> > >
> > > On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang 
> wrote:
> > >
> > > > Hi Mangat,
> > > >
> > > > Plea

Re: Kafka Stream: State replication seems unpredictable.

2021-04-14 Thread Guozhang Wang
Hey Mangat,

A. With at least once, Streams does not make sure atomicity of 1) / 2);
with exactly once, atomicity is indeed guaranteed with transactional
messaging.

B. If you are using processor API, then I'm assuming you did your own
group-by processor right? In that case, the partition key would just be the
record key when you are sending to the repartition topic.


Guozhang




On Thu, Apr 8, 2021 at 9:00 AM mangat rai  wrote:

> Thanks again, that makes things clear. I still have some questions here
> then.
>
> A.  For each record we read, we do two updates
>   1. Changelog topic of the state store.
>   2. Output topic aka sink.
>   Does the Kafka stream app make sure that either both are committed or
> neither?
>
> B.  Out Input topic actually has the as (a,b,c), but we partition with only
> (a). We do this because we have different compaction requirements than the
> partitions. It will still work as all (a,b,c) records will go to the same
> partition. Now in aggregation, we group by (a,b,c). In such case what will
> be the partition key for the changelog topic?
>
> Note that we use low-level processor API and don't commit ourselves.
>
> Regards,
> Mangat
>
>
>
>
> On Thu, Apr 8, 2021 at 5:37 PM Guozhang Wang  wrote:
>
> > Hi Mangat,
> >
> > Please see my replies inline below.
> >
> > On Thu, Apr 8, 2021 at 5:34 AM mangat rai  wrote:
> >
> > > @Guozhang Wang
> > >
> > > Thanks for the reply.  Indeed I am finding it difficult to explain this
> > > state. I checked the code many times. There can be a bug but I fail to
> > see
> > > it. There are several things about the Kafka streams that I don't
> > > understand, which makes it harder for me to reason.
> > >
> > > 1. What is the partition key for the changelog topics? Is it the same
> as
> > > the Input key or the state store key? Or maybe the thread specifies the
> > > partition as it knows the input partition it is subscribed to? If the
> > input
> > > topic and state store are differently partitioned then we can explain
> the
> > > issue here.
> > >
> >
> > In Kafka Stream's changelog, the "partition key" of Kafka messages is the
> > same as the "message key" itself. And the message key is the same as the
> > state store key.
> >
> > Since the state store here should be storing the running aggregate, it
> > means that the partition key is the same as the aggregated key.
> >
> > If you are doing a group-by aggregation here, where the group-by keys are
> > different from the source input topic's keys, hence the state store keys
> > would be different with the input topic keys.
> >
> >
> > > 2. Is there a background thread to persist in the state store when
> > caching
> > > is disabled? When will the app commit the log for the input topic? Is
> it
> > > when sink writes into the output topic or when the state store writes
> > into
> > > the changelog topic? Because, if the app commits the record before the
> > data
> > > was written to changelog topic then we can again explain this state
> > >
> > > The commit happens *after* the local state store, as well as the
> > changelog records sent by the Streams' producers, have been flushed. I.e.
> > if there's a failure in between, you would re-process some source records
> > and hence cause duplicates, but no data loss (a.k.a. the at_least_once
> > semantics).
> >
> >
> >
> > > >You may also consider upgrading to 2.6.x or higher version and see if
> > this
> > > issue goes away.
> > > Do you mean the client or the Kafka broker? I will be upgrading the
> > client
> > > to 2.7.0 soon.
> > >
> > > I meant the client.
> >
> >
> > > Sadly looking into the timestamp will not help much as we use some
> > business
> > > time field to set the record timestamp. If I am right, there is no way
> > now
> > > to know that when a Producer wrote a record in a Kafka topic.
> > >
> > > Regards,
> > > Mangat
> > >
> > >
> > >
> > > On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang 
> wrote:
> > >
> > > > Hello Mangat,
> > > >
> > > > With at least once, although some records maybe processed multiple
> > times
> > > > their process ordering should not be violated, so what you observed
> is
> > > not
> > > > expected. What caught my eyes are this section in your output
> > changelogs

Re: Yet Another Repartitioning Question About Kafka Streams

2021-04-14 Thread Guozhang Wang
Hello Gareth,

There is a checkpoint file that records the corresponding offset of the
changelog for the state store data co-located within the state directory;
after the partition is migrated to new owners, this checkpoint file along
with the state store would not be deleted immediately but follow a cleanup
delay policy.

Guozhang

On Sun, Apr 11, 2021 at 11:13 AM Gareth Collins 
wrote:

> Hi Guozheng,
>
> Thanks very much again for the answers!
>
> One follow-up on the first question. Just so I understand it, how would it
> know where to continue from?
> I would assume that once we repartition, the new node will own the position
> in the consumer group for the relevant partition(s)
> so Kafka/Zookeeper would not know the position of the dead node anymore. Is
> the position also stored in RocksDB too somehow?
>
> thanks in advance,
> Gareth
>
>
>
>
> On Mon, Apr 5, 2021 at 6:34 PM Guozhang Wang  wrote:
>
> > Hello Gareth,
> >
> > 1) For this scenario, its state should be reusable and we do not need to
> > read from scratch from Kafka to rebuild.
> >
> > 2) "Warmup replicas" is just a special standby replica that is temporary,
> > note that if there's no partition migration needed at the moment, the
> > num.warmup.replicas is actually zero; the difference of
> > `max.warmup.replicas` config and the `num.standby.replicas` config is
> that,
> > the former is a global limit number, while the latter is a per task
> number.
> > I.e. if you have a total of N tasks, and you have these configs set as P
> > and Q, then during normal processing you'll have (Q+1) * N total
> replicas,
> > while during a rebalance you may have up to (Q+1) * N + P total replicas.
> > As you can see now, setting P to a larger than one value means that a
> > single rebalance run may be able to warm-up multiple partitions yet to be
> > moved with the cost of more space temporarily, while having a smaller
> > number means you may need more rounds of rebalances to achieve the end
> > rebalance goal.
> >
> > 3) Yes, if there are standby replicas, then you can still access
> > standby's states via IQ. You can read this KIP for more details:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> >
> >
> > Guozhang
> >
> > On Sun, Apr 4, 2021 at 12:41 PM Gareth Collins <
> gareth.o.coll...@gmail.com
> > >
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks very much for answers to my previous questions here.
> > >
> > > I had a couple more questions about repartitioning and I just want to
> > > confirm my understanding.
> > >
> > > (1) Given the following scenario:
> > >
> > > (a) I have a cluster of Kafka stream nodes with partitions assigned to
> > > each.
> > >
> > > (b) One node goes down...and it goes down for long enough that a
> > > repartition happens (i.e. a time greater than
> > > scheduled.rebalance.max.delay.ms passes by).
> > >
> > > (c) Then the node finally comes back. If the state is still there can
> it
> > > still be used (assuming it is assigned the same partitions)...and only
> > the
> > > delta read from Kafka? Or will it need to read everything again to
> > rebuild
> > > the state? I assume it has to re-read the state but I want to make
> sure.
> > >
> > > (2) I understand warmup replicas help with minimizing downtime. If I
> > > understand correctly, if I have at least one warmup replica configured
> > and
> > > if the state needed to be rebuilt from scratch in the scenario above,
> > > switchover back to the old node will be delayed until the rebuild is
> > > complete. Is my understanding correct? If my understanding is correct,
> > why
> > > would you ever set more than one warmup replica? Or should warmup
> > replicas
> > > usually be equal to standby replicas + 1 just in case multiple nodes
> are
> > > stood up simultaneously?
> > >
> > > (3) If I set the scheduled rebalance delay to be greater than 0 and a
> > node
> > > goes down, will I be able to access the state data from other replicas
> > > while I am waiting for the rebalance?
> > >
> > > Any answers would be greatly appreciated.
> > >
> > > thanks,
> > > Gareth
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: Kafka Stream: State replication seems unpredictable.

2021-04-08 Thread Guozhang Wang
Hi Mangat,

Please see my replies inline below.

On Thu, Apr 8, 2021 at 5:34 AM mangat rai  wrote:

> @Guozhang Wang
>
> Thanks for the reply.  Indeed I am finding it difficult to explain this
> state. I checked the code many times. There can be a bug but I fail to see
> it. There are several things about the Kafka streams that I don't
> understand, which makes it harder for me to reason.
>
> 1. What is the partition key for the changelog topics? Is it the same as
> the Input key or the state store key? Or maybe the thread specifies the
> partition as it knows the input partition it is subscribed to? If the input
> topic and state store are differently partitioned then we can explain the
> issue here.
>

In Kafka Stream's changelog, the "partition key" of Kafka messages is the
same as the "message key" itself. And the message key is the same as the
state store key.

Since the state store here should be storing the running aggregate, it
means that the partition key is the same as the aggregated key.

If you are doing a group-by aggregation here, where the group-by keys are
different from the source input topic's keys, hence the state store keys
would be different with the input topic keys.


> 2. Is there a background thread to persist in the state store when caching
> is disabled? When will the app commit the log for the input topic? Is it
> when sink writes into the output topic or when the state store writes into
> the changelog topic? Because, if the app commits the record before the data
> was written to changelog topic then we can again explain this state
>
> The commit happens *after* the local state store, as well as the
changelog records sent by the Streams' producers, have been flushed. I.e.
if there's a failure in between, you would re-process some source records
and hence cause duplicates, but no data loss (a.k.a. the at_least_once
semantics).



> >You may also consider upgrading to 2.6.x or higher version and see if this
> issue goes away.
> Do you mean the client or the Kafka broker? I will be upgrading the client
> to 2.7.0 soon.
>
> I meant the client.


> Sadly looking into the timestamp will not help much as we use some business
> time field to set the record timestamp. If I am right, there is no way now
> to know that when a Producer wrote a record in a Kafka topic.
>
> Regards,
> Mangat
>
>
>
> On Wed, Apr 7, 2021 at 6:22 PM Guozhang Wang  wrote:
>
> > Hello Mangat,
> >
> > With at least once, although some records maybe processed multiple times
> > their process ordering should not be violated, so what you observed is
> not
> > expected. What caught my eyes are this section in your output changelogs
> > (high-lighted):
> >
> > Key1, V1
> > Key1, null
> > Key1, V1
> > Key1, null  (processed again)
> > Key1, V2
> > Key1, null
> >
> > *Key1, V1Key1,V2*
> > Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed
> V1
> > again due to reassignment)
> >
> > They seem to be the result of first receiving a tombstone which removes
> V1
> > and then a new record that adds V2. However, since caching is disabled
> you
> > should get
> >
> > *Key1,V1*
> > *Key1,null*
> > *Key1,V2*
> >
> > instead; without the actual code snippet I cannot tell more what's
> > happening here. If you can look into the logs you can record each time
> when
> > partition migrates, how many records from the changelog was replayed to
> > restore the store, and from which offset on the input topic does Streams
> > resume processing. You may also consider upgrading to 2.6.x or higher
> > version and see if this issue goes away.
> >
> >
> > Guozhang
> >
> > On Tue, Apr 6, 2021 at 8:38 AM mangat rai  wrote:
> >
> > > Hey,
> > >
> > > We have the following setup in our infrastructure.
> > >
> > >1. Kafka - 2.5.1
> > >2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
> > >3. Low level processor API is used with *atleast-once* semantics
> > >4. State stores are *in-memory* with *caching disabled* and
> *changelog
> > >enabled*
> > >
> > >
> > > Is it possible that during state replication and partition
> reassignment,
> > > the input data is not always applied to the state store?
> > >
> > > 1. Let's say the input topic is having records like following
> > >
> > > ```
> > > Key1, V1
> > > Key1, null (tombstone)
> > > Key1, V2
> > > Key1, null
> > > Key1, V3
> > > Key1, V4
> > > ```
> &g

[ANNOUNCE] New Committer: Bruno Cadonna

2021-04-07 Thread Guozhang Wang
Hello all,

I'm happy to announce that Bruno Cadonna has accepted his invitation to
become an Apache Kafka committer.

Bruno has been contributing to Kafka since Jan. 2019 and has made 99
commits and more than 80 PR reviews so far:

https://github.com/apache/kafka/commits?author=cadonna

He worked on a few key KIPs on Kafka Streams:

* KIP-471: Expose RocksDB Metrics in Kafka Streams
* KIP-607: Add Metrics to Kafka Streams to Report Properties of RocksDB
* KIP-662: Throw Exception when Source Topics of a Streams App are Deleted

Besides all the code contributions and reviews, he's also done a handful
for the community: multiple Kafka meetup talks in Berlin and Kafka Summit
talks, an introductory class to Kafka at Humboldt-Universität zu Berlin
seminars, and have co-authored a paper on Kafka's stream processing
semantics in this year's SIGMOD conference (
https://en.wikipedia.org/wiki/SIGMOD). Bruno has also been quite active on
SO channels and AK mailings.

Please join me to congratulate Bruno for all the contributions!

-- Guozhang


Re: Kafka Stream: State replication seems unpredictable.

2021-04-07 Thread Guozhang Wang
Hello Mangat,

With at least once, although some records maybe processed multiple times
their process ordering should not be violated, so what you observed is not
expected. What caught my eyes are this section in your output changelogs
(high-lighted):

Key1, V1
Key1, null
Key1, V1
Key1, null  (processed again)
Key1, V2
Key1, null

*Key1, V1Key1,V2*
Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed V1
again due to reassignment)

They seem to be the result of first receiving a tombstone which removes V1
and then a new record that adds V2. However, since caching is disabled you
should get

*Key1,V1*
*Key1,null*
*Key1,V2*

instead; without the actual code snippet I cannot tell more what's
happening here. If you can look into the logs you can record each time when
partition migrates, how many records from the changelog was replayed to
restore the store, and from which offset on the input topic does Streams
resume processing. You may also consider upgrading to 2.6.x or higher
version and see if this issue goes away.


Guozhang

On Tue, Apr 6, 2021 at 8:38 AM mangat rai  wrote:

> Hey,
>
> We have the following setup in our infrastructure.
>
>1. Kafka - 2.5.1
>2. Apps use kafka streams `org.apache.kafka` version 2.5.1 library
>3. Low level processor API is used with *atleast-once* semantics
>4. State stores are *in-memory* with *caching disabled* and *changelog
>enabled*
>
>
> Is it possible that during state replication and partition reassignment,
> the input data is not always applied to the state store?
>
> 1. Let's say the input topic is having records like following
>
> ```
> Key1, V1
> Key1, null (tombstone)
> Key1, V2
> Key1, null
> Key1, V3
> Key1, V4
> ```
> 2. The app has an aggregation function which takes these record and update
> the state store so that changelog shall be
>
> ```
> Key1, V1
> Key1, null (tombstone)
> Key1, V2
> Key1, null
> Key1, V3
> Key1, V3 + V4
> ```
> Let's say the partition responsible for processing the above key was
> several times reallocated to different threads due to some infra issues we
> are having(in Kubernetes where we run the app, not the Kafka cluster).
>
> I see the following record in the changelogs
>
> ```
> Key1, V1
> Key1, null
> Key1, V1
> Key1, null  (processed again)
> Key1, V2
> Key1, null
> Key1, V1
> Key1,V2
> Key1, V2+V1 (I guess we didn't process V2 tombstone yet but reprocessed V1
> again due to reassignment)
> Key1,V1 (V2 is gone as there was a tombstone, but then V1 tombstone should
> have been applied also!!)
> Key1, V2+V1 (it is back!!!)
> Key1,V1
> Key1, V1 + V2 + V3 (This is the final state)!
> ```
>
> If you see this means several things
> 1. The state is always correctly applied locally (in developer laptop),
> where there were no reassignments.
> 2. The records are processed multiple times, which is understandable as we
> have at least symantics here.
> 3. As long as we re-apply the same events in the same orders we are golden
> but looks like some records are skipped, but here it looks as if we have
> multiple consumers reading and update the same topics, leading to race
> conditions.
>
> Is there any way, Kafka streams' state replication could lead to such a
> race condition?
>
> Regards,
> Mangat
>


-- 
-- Guozhang


Re: Kafka Streams application startup issues

2021-04-05 Thread Guozhang Wang
Hello Mikko,

The issue that you saw restoration repeating and never completed is a bit
weird, and without further logs I cannot tell exactly what's the root cause.

At the same time, if your common scenario is upgrade, maybe you can
consider using static members to avoid unnecessary rebalances (and hence
the restoration it brought in):

https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances

Another thing is that, if for some reasons (including the rolling bounces)
that some rebalances are unavoidable and is taking time, you can use
standby replicas to still serve query access:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance

Hope these ref links may help you.


Guozhang

On Wed, Mar 31, 2021 at 8:20 AM Hänninen, Mikko
 wrote:

> Hello,
>
>
>
> I’m working on creating an application that leverages Kafka and Kafka
> Streams. I have some issues with application startup that I’ve been unable
> to solve myself even with the help of my team mates, so I’m writing here in
> the hopes that someone could offer help. I’d be very grateful.
>
>
>
>
>
> Application description:
>
>
>
> The application is running in AWS and uses the AWS MSK service for the
> operation of the brokers. The application is running in parallel on
> multiple nodes, typically we have 3 but it’s meant to scale to tens if
> needed. The number of brokers is also currently 3. Kafka version is
> 2.6.0, both in MSK and in the Kafka libraries included with the application.
>
>
>
> The application is running in US west coast, while the Kafka brokers are
> in Europe, so there is some network lag between. (There’s another group of 3
> servers running also in Europe, with a different application id
> configured, so the servers in a given geographic location have their own
> consumer groups.)
>
>
>
> The application uses a Kafka Streams ReadOnlyKeyValueStore to consume a
> Kafka topic, say topic R, which has key-value pairs. The key is a string or
> other multibyte value, the value is a serialised structure with a number
> (Long) and some other data. The application provides a REST API through
> which clients can make requests with some key, and the application returns
> the number from the value, which should be the latest number seen for the
> given key in the topic. The goal of the API is to respond within
> milliseconds, e.g. 5 or 10 ms or so. (This is the reason why the
> application servers are geographically far away from the brokers, to
> provide low latency in that location.)
>
>
>
> If the requested key is not local on a given server, the application
> determines which server has that key based on the Kafka metadata, and
> forwards the request to that server. This part works fine, at least in
> terms of Kafka use.
>
>
>
> The key space is expected to be very large, perhaps tens or hundreds of
> millions and maybe more. The application is still in development so we
> have not seen that many yet in practice, at most it’s probably some few
> thousands or tens of thousands with generated test data.
>
>
>
>
>
> Problem description:
>
>
>
> The reason why I’m writing here is to get help with Kafka/Kafka Streams
> startup issues. Sometimes, but much too frequently, when all the servers
> are restarted e.g. due to deploying a new version of the application, some
> of the applications will not start up cleanly.
>
>
>
> At first there was the error with the message “topic may have migrated to
> another instance”. This was eventually solved by applying retrying for more
> than 10 minutes, after which there was apparently a rebalance and the
> server in question was able to synchronise with Kafka and join to the
> consumer group. This still happens and having a startup time of over 10
> minutes is not desirable, but at least it’s no longer blocking development.
>
>
>
> Now there’s a second startup issue, with an exception
> org.apache.kafka.common.errors.DisconnectException being thrown by
> org.apache.kafka.clients.FetchSessionHandler with the message “Error
> sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2:”
>
>
>
> Before the timeout there’s a restore log message “stream-thread
> [query-api-us-west-2-0943f8d4-1720-4b3b-904d-d2efa190a135-StreamThread-1]
> Restoration in progress for 20 partitions.” followed by a dump of the 20
> partitions. e.g.
> “{query-api-us-west-2-query-api-us-west-2-prevalence-ratings-changelog-49:
> position=0, end=37713, totalRestored=0}” -- the position and totalRestored
> are always 0.
>
> The partitions are for the changelog topic associated with the above
> mentioned topic R. There are 60 partitions total in R, so 20 matches the
> expected count per server (60/3). I’m assuming the number of partitions in
> the changelog is the same as the actual topic.
>
>
>
> These log messages repeat every 31 seconds or so.
>
>
>
> Kafka Streams state does not reach RUNNING, the 

Re: Yet Another Repartitioning Question About Kafka Streams

2021-04-05 Thread Guozhang Wang
Hello Gareth,

1) For this scenario, its state should be reusable and we do not need to
read from scratch from Kafka to rebuild.

2) "Warmup replicas" is just a special standby replica that is temporary,
note that if there's no partition migration needed at the moment, the
num.warmup.replicas is actually zero; the difference of
`max.warmup.replicas` config and the `num.standby.replicas` config is that,
the former is a global limit number, while the latter is a per task number.
I.e. if you have a total of N tasks, and you have these configs set as P
and Q, then during normal processing you'll have (Q+1) * N total replicas,
while during a rebalance you may have up to (Q+1) * N + P total replicas.
As you can see now, setting P to a larger than one value means that a
single rebalance run may be able to warm-up multiple partitions yet to be
moved with the cost of more space temporarily, while having a smaller
number means you may need more rounds of rebalances to achieve the end
rebalance goal.

3) Yes, if there are standby replicas, then you can still access
standby's states via IQ. You can read this KIP for more details:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance


Guozhang

On Sun, Apr 4, 2021 at 12:41 PM Gareth Collins 
wrote:

> Hi,
>
> Thanks very much for answers to my previous questions here.
>
> I had a couple more questions about repartitioning and I just want to
> confirm my understanding.
>
> (1) Given the following scenario:
>
> (a) I have a cluster of Kafka stream nodes with partitions assigned to
> each.
>
> (b) One node goes down...and it goes down for long enough that a
> repartition happens (i.e. a time greater than
> scheduled.rebalance.max.delay.ms passes by).
>
> (c) Then the node finally comes back. If the state is still there can it
> still be used (assuming it is assigned the same partitions)...and only the
> delta read from Kafka? Or will it need to read everything again to rebuild
> the state? I assume it has to re-read the state but I want to make sure.
>
> (2) I understand warmup replicas help with minimizing downtime. If I
> understand correctly, if I have at least one warmup replica configured and
> if the state needed to be rebuilt from scratch in the scenario above,
> switchover back to the old node will be delayed until the rebuild is
> complete. Is my understanding correct? If my understanding is correct, why
> would you ever set more than one warmup replica? Or should warmup replicas
> usually be equal to standby replicas + 1 just in case multiple nodes are
> stood up simultaneously?
>
> (3) If I set the scheduled rebalance delay to be greater than 0 and a node
> goes down, will I be able to access the state data from other replicas
> while I am waiting for the rebalance?
>
> Any answers would be greatly appreciated.
>
> thanks,
> Gareth
>


-- 
-- Guozhang


Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-30 Thread Guozhang Wang
Great to hear! Always a pleasure.

Guozhang

On Tue, Mar 30, 2021 at 8:04 PM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> We can confirm the behavior with the 2.7.1 release. Appreciate all the
> help!
>
>
>
> Cheers,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Tuesday, March 30, 2021 at 2:10 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed
> the root cause then. Note that this is only an issue with punctuation
> triggered events, where `context.timestamp()` would return 0 (and it is
> fixed in the yet-to-release 2.7.1/2.8.0).
>
> You can consider applying the patch if you could on top of 2.7.0, or wait
> for the new release; OR, if your production code does not actually use
> punctuation to write records to Kafka, then this issue would not actually
> impact you.
>
>
> Guozhang
>
> On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai  wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > Great to hear we might have found the issue!
> >
> >
> >
> > To answer your question, the changelog record is generated by us calling
> > ‘store.put(key,value)’ from the punctuate callback, which makes sense
> then
> > because the timestamp would be 0 like you saw in your test as well.
> >
> >
> >
> > Best,
> >
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> > 
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang 
> > *Date: *Tuesday, March 30, 2021 at 1:00 PM
> > *To: *Users 
> > *Cc: *Bart Lilje 
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > Hello Upesh,
> >
> > These are super helpful logs, and I think I'm very close to the root
> cause
> > of it. You see, the written changelog record's timestamp is set to 0
> > (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable
> Kafka
> > server start time (presumingly in 21st century), the retention time would
> > always be breached, and causing the log deletion mechanism to trigger.
> >
> > The timestamp is set with `context.timestamp()` which would use the
> > processing record's timestamp; but myself have seen and fixed a bug (
> > https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp
> was
> > not populated and hence set to 0 if was generated as part of a
> punctuation.
> > So my next key question is: is this changelog record generated, i.e. its
> > put call triggered, from processing an input record, or from a
> punctuation
> > call?
> >
> >
> > Guozhang
> >
> > On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai 
> wrote:
> >
> > > Hi Guozhang,
> > >
> > >
> > >
> > > When testing with a 2.6.1 broker and 2.7 streams application, I see the
> > > same behavior as described before with the 2.7 broker where just after
> a
> > > record is written to the changelog topic, the log segment is rolled and
> > > deleted citing that the retention time has passed (the record was
> written
> > > to the state store at ~15:49:
> > >
> > >
> > >
> > > [2021-03-29 15:49:13,757] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> > > segments with base offsets [0] due to retention time 25920ms breach
> > > (kafka.log.Log)
> > > [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> > > partition=test-stream-store-changelog-4] Writing producer snapshot at
> > > offset 1 (kafka.log.ProducerStateManager)
> > > [2021-03-29 15:49:13,763] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> > > segment at offset 1 in 5 ms. (kafka.log.Log)
> > > [2021-03-29 15:49:13,764] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling
> segments
> > > for deletion LogSegment(baseOffset=0, size=156,
> > > lastModifiedTime=1617050940118

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-30 Thread Guozhang Wang
Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed
the root cause then. Note that this is only an issue with punctuation
triggered events, where `context.timestamp()` would return 0 (and it is
fixed in the yet-to-release 2.7.1/2.8.0).

You can consider applying the patch if you could on top of 2.7.0, or wait
for the new release; OR, if your production code does not actually use
punctuation to write records to Kafka, then this issue would not actually
impact you.


Guozhang

On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> Great to hear we might have found the issue!
>
>
>
> To answer your question, the changelog record is generated by us calling
> ‘store.put(key,value)’ from the punctuate callback, which makes sense then
> because the timestamp would be 0 like you saw in your test as well.
>
>
>
> Best,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Tuesday, March 30, 2021 at 1:00 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> These are super helpful logs, and I think I'm very close to the root cause
> of it. You see, the written changelog record's timestamp is set to 0
> (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
> server start time (presumingly in 21st century), the retention time would
> always be breached, and causing the log deletion mechanism to trigger.
>
> The timestamp is set with `context.timestamp()` which would use the
> processing record's timestamp; but myself have seen and fixed a bug (
> https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
> not populated and hence set to 0 if was generated as part of a punctuation.
> So my next key question is: is this changelog record generated, i.e. its
> put call triggered, from processing an input record, or from a punctuation
> call?
>
>
> Guozhang
>
> On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai  wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > When testing with a 2.6.1 broker and 2.7 streams application, I see the
> > same behavior as described before with the 2.7 broker where just after a
> > record is written to the changelog topic, the log segment is rolled and
> > deleted citing that the retention time has passed (the record was written
> > to the state store at ~15:49:
> >
> >
> >
> > [2021-03-29 15:49:13,757] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> > segments with base offsets [0] due to retention time 25920ms breach
> > (kafka.log.Log)
> > [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> > partition=test-stream-store-changelog-4] Writing producer snapshot at
> > offset 1 (kafka.log.ProducerStateManager)
> > [2021-03-29 15:49:13,763] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> > segment at offset 1 in 5 ms. (kafka.log.Log)
> > [2021-03-29 15:49:13,764] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> > for deletion LogSegment(baseOffset=0, size=156,
> > lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> > [2021-03-29 15:49:13,765] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log
> start
> > offset to 1 due to segment deletion (kafka.log.Log)
> >
> >
> >
> > Does this have anything to do with the *largetTime=0* mentioned in the
> > log? This was the first and only record written to the store/changelog.
> Is
> > there anything else we can try to resolve this issue or give us more
> > insight into where this issue could originate from?
> >
> >
> >
> > Thanks,
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> > 
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Upesh Desai 
> > *Date: *Thursday, March 25, 2021 at 6:46 PM
> > *To: *users@kafka.apache.org 
> > *Cc: *Bart Lilje 
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > We have not tried 

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-30 Thread Guozhang Wang
Hello Upesh,

These are super helpful logs, and I think I'm very close to the root cause
of it. You see, the written changelog record's timestamp is set to 0
(i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
server start time (presumingly in 21st century), the retention time would
always be breached, and causing the log deletion mechanism to trigger.

The timestamp is set with `context.timestamp()` which would use the
processing record's timestamp; but myself have seen and fixed a bug (
https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
not populated and hence set to 0 if was generated as part of a punctuation.
So my next key question is: is this changelog record generated, i.e. its
put call triggered, from processing an input record, or from a punctuation
call?


Guozhang

On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> When testing with a 2.6.1 broker and 2.7 streams application, I see the
> same behavior as described before with the 2.7 broker where just after a
> record is written to the changelog topic, the log segment is rolled and
> deleted citing that the retention time has passed (the record was written
> to the state store at ~15:49:
>
>
>
> [2021-03-29 15:49:13,757] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> segments with base offsets [0] due to retention time 25920ms breach
> (kafka.log.Log)
> [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> partition=test-stream-store-changelog-4] Writing producer snapshot at
> offset 1 (kafka.log.ProducerStateManager)
> [2021-03-29 15:49:13,763] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> segment at offset 1 in 5 ms. (kafka.log.Log)
> [2021-03-29 15:49:13,764] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> for deletion LogSegment(baseOffset=0, size=156,
> lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> [2021-03-29 15:49:13,765] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log start
> offset to 1 due to segment deletion (kafka.log.Log)
>
>
>
> Does this have anything to do with the *largetTime=0* mentioned in the
> log? This was the first and only record written to the store/changelog. Is
> there anything else we can try to resolve this issue or give us more
> insight into where this issue could originate from?
>
>
>
> Thanks,
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Upesh Desai 
> *Date: *Thursday, March 25, 2021 at 6:46 PM
> *To: *users@kafka.apache.org 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> We have not tried running 2.6 brokers and 2.7 client, I will try and get
> back to you.
>
>
>
> We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE.
> The shutdowns and restarts of the stream app are clean each time.
>
>
>
> I see in the broker logs certain lines indicating that the log segment is
> being rolled and deleted, but I don’t see how or why this should be
> happening when the records were just written. See the log line snippets
> included in the attached file. Initially 8 records are added (offsets 0-8),
> followed by a single record (offset 9). They are rolled and deleted almost
> instantly.
>
>
>
> Best,
>
> Upesh
>
>
>
> *Upesh Desai**​*
>
> * | *
>
> Senior Software Developer
>
>  |
>
> *ude...@itrsgroup.com *
>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Thursday, March 25, 2021 at 6:31 PM
> *To: *Users 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> BTW, yes that indicates the record in the changelog was already truncated
> (logically). But since we only physically truncate logs by segments, which
> is 1GB by default, it should still be physically on the log. Are you
> enabling EOS on Streams, and when you shutdown the streams app, is that a
> clean shutdown?
>
> On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang  wrote:
>
> > That's indeed weird.
> >
> > Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> > with 2.7?
> >
> > On Thu, Mar 25, 2021 at 2:34 PM Upes

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-25 Thread Guozhang Wang
BTW, yes that indicates the record in the changelog was already truncated
(logically). But since we only physically truncate logs by segments, which
is 1GB by default, it should still be physically on the log. Are you
enabling EOS on Streams, and when you shutdown the streams app, is that a
clean shutdown?

On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang  wrote:

> That's indeed weird.
>
> Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> with 2.7?
>
> On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai  wrote:
>
>> Hello Guozhang,
>>
>>
>>
>> I have tried your suggestions with an inMemoryStore FYI and seen the
>> following:
>>
>>
>>
>>1. I have the record added to the state store, stopped the
>>application, and check the earliest and latest offsets via the command 
>> line
>>tools. This shows that the earliest offset is 1, and the latest offset is
>>also 1. Does this mean that the record has been marked for deletion
>>already? My retention.ms config is set to 3 days (25920 ms), so
>>it should not be marked for deletion if added a couple minutes prior?
>>2. Following the above, this makes sense as well. When logging the
>>starting offset, it is not 0, but rather 1:
>>
>>*topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>>end offset: 1*
>>
>>
>>
>> I also confirmed different behavior when we change the changelog topic
>> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
>> see this issue when the changelog is just set to compact. We also confirmed
>> that this does not happen when we run everything on Kafka version 2.6.
>>
>>
>>
>> Thanks,
>>
>> Upesh
>>
>>
>> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
>> 
>> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> <https://www.itrsgroup.com/>
>>
>> *From: *Guozhang Wang 
>> *Date: *Thursday, March 25, 2021 at 4:01 PM
>> *To: *Users 
>> *Cc: *Bart Lilje 
>> *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> changelog topics
>>
>> Hello Upesh,
>>
>> Could you confirm a few more things for me:
>>
>> 1. After you stopped the application, and wiped out the state dir; check
>> if
>> the corresponding changelog topic has one record indeed at offset 0 ---
>> this can be done via the admin#listOffsets (get the earliest and latest
>> offset, which should be 0 and 1 correspondingly).
>> 2. After you resumed the application, check from which starting position
>> we
>> are restoring the changelog --- this can be done via implementing the
>> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
>> restoreEndOffset);`, should be 0
>>
>> If both of them check out fine as expected, then from the code I think
>> bufferedLimitIndex should be updated to 1.
>>
>>
>> Guozhang
>>
>> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai  wrote:
>>
>> > Hi Guozhang,
>> >
>> >
>> >
>> > Here are some of the answers to your questions I see during my testing:
>> >
>> >
>> >
>> >1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
>> my
>> >test 1 record had been added to the store. However the numRecords
>> variable
>> >is still set to 0
>> >2. For that particular test, `hasRestoredToEnd()` indeed returns true
>> >as well. But it is confusing since the store is actually empty / that
>> >record I added does not exist in the store when trying to check for
>> it.
>> >3. N/A
>> >
>> >
>> >
>> > A little more information, the records we add to this store/changelog
>> are
>> > of type  where the value is always set to an empty
>> byte
>> > array `new byte[0]`. A couple other variations I have tried are setting
>> to
>> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>> >
>> >
>> >
>> > Hope this gives a little more clarity and hope to hear from you soon.
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Upesh
>> >
>> >
>> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
>> > 
>> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> > <https://www.itrsgroup.com/>
>> >
>> > *From: *Guozhang Wang 
>> 

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-25 Thread Guozhang Wang
That's indeed weird.

Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
with 2.7?

On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai  wrote:

> Hello Guozhang,
>
>
>
> I have tried your suggestions with an inMemoryStore FYI and seen the
> following:
>
>
>
>1. I have the record added to the state store, stopped the
>application, and check the earliest and latest offsets via the command line
>tools. This shows that the earliest offset is 1, and the latest offset is
>also 1. Does this mean that the record has been marked for deletion
>already? My retention.ms config is set to 3 days (25920 ms), so it
>should not be marked for deletion if added a couple minutes prior?
>2. Following the above, this makes sense as well. When logging the
>starting offset, it is not 0, but rather 1:
>
>*topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>end offset: 1*
>
>
>
> I also confirmed different behavior when we change the changelog topic
> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT see
> this issue when the changelog is just set to compact. We also confirmed
> that this does not happen when we run everything on Kafka version 2.6.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Thursday, March 25, 2021 at 4:01 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Could you confirm a few more things for me:
>
> 1. After you stopped the application, and wiped out the state dir; check if
> the corresponding changelog topic has one record indeed at offset 0 ---
> this can be done via the admin#listOffsets (get the earliest and latest
> offset, which should be 0 and 1 correspondingly).
> 2. After you resumed the application, check from which starting position we
> are restoring the changelog --- this can be done via implementing the
> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
> restoreEndOffset);`, should be 0
>
> If both of them check out fine as expected, then from the code I think
> bufferedLimitIndex should be updated to 1.
>
>
> Guozhang
>
> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai  wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > Here are some of the answers to your questions I see during my testing:
> >
> >
> >
> >1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
> >test 1 record had been added to the store. However the numRecords
> variable
> >is still set to 0
> >2. For that particular test, `hasRestoredToEnd()` indeed returns true
> >as well. But it is confusing since the store is actually empty / that
> >record I added does not exist in the store when trying to check for
> it.
> >3. N/A
> >
> >
> >
> > A little more information, the records we add to this store/changelog are
> > of type  where the value is always set to an empty byte
> > array `new byte[0]`. A couple other variations I have tried are setting
> to
> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
> >
> >
> >
> > Hope this gives a little more clarity and hope to hear from you soon.
> >
> >
> >
> > Thanks,
> >
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> > 
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang 
> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> > *To: *Users 
> > *Cc: *Bart Lilje 
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > Hello Upesh,
> >
> > Thanks for the detailed report. I looked through the code and tried to
> > reproduce the issue, but so far have not succeeded. I think I may need
> some
> > further information from you to help my further investigation.
> >
> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> > an issue, as long as it could still be bumped later (i.e. it is possible
> > that the restore consumer has not fetched data yet). What's key though,
> is
> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> > be created with null value, and then been

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-25 Thread Guozhang Wang
Hello Upesh,

Could you confirm a few more things for me:

1. After you stopped the application, and wiped out the state dir; check if
the corresponding changelog topic has one record indeed at offset 0 ---
this can be done via the admin#listOffsets (get the earliest and latest
offset, which should be 0 and 1 correspondingly).
2. After you resumed the application, check from which starting position we
are restoring the changelog --- this can be done via implementing the
`stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
restoreEndOffset);`, should be 0

If both of them check out fine as expected, then from the code I think
bufferedLimitIndex should be updated to 1.


Guozhang

On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> Here are some of the answers to your questions I see during my testing:
>
>
>
>1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
>test 1 record had been added to the store. However the numRecords variable
>is still set to 0
>2. For that particular test, `hasRestoredToEnd()` indeed returns true
>as well. But it is confusing since the store is actually empty / that
>record I added does not exist in the store when trying to check for it.
>3. N/A
>
>
>
> A little more information, the records we add to this store/changelog are
> of type  where the value is always set to an empty byte
> array `new byte[0]`. A couple other variations I have tried are setting to
> a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>
>
>
> Hope this gives a little more clarity and hope to hear from you soon.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Wednesday, March 24, 2021 at 1:37 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Thanks for the detailed report. I looked through the code and tried to
> reproduce the issue, but so far have not succeeded. I think I may need some
> further information from you to help my further investigation.
>
> 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> an issue, as long as it could still be bumped later (i.e. it is possible
> that the restore consumer has not fetched data yet). What's key though, is
> to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> be created with null value, and then been initialized once. ChangelogReader
> would stop restoring once the current offset has reached beyond this value
> or if this value itself is 0.
>
> 2) If `restoreEndOffset` is initialized to a non-zero value, then check if
> the restoration indeed completed without applying any records, this is
> determined as `hasRestoredToEnd()` returning true.
>
> 3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
> top of my head I can only think of that the consumer's end offset request
> gets the response with 0, indicating the changelog is now empty.
>
>
> Guozhang
>
>
> On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai  wrote:
>
> > Hi all,
> >
> >
> >
> > Our team think we discovered a bug over the weekend withing the Kafka
> > Streams / Processor API. We are running 2.7.0.
> >
> >
> >
> > When configuring a state store backed by a changelog topic with the
> > cleanup policy configuration set to “compact,delete”:
> >
> >
> >
> > final StoreBuilder> store = Stores
> >   .*keyValueStoreBuilder*(
> > Stores.*persistentKeyValueStore*(*STORE_ID*),
> > kSerde,
> > vSerde)
> >   .withLoggingEnabled(Map.*of*(
> > *RETENTION_MS_CONFIG*, "9000"),
> > *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >   .withCachingEnabled();
> >
> >
> >
> > Here is how we reproduced the problem:
> >
> >1. Records are written to the state store, and subsequently produced
> >to the changelog topic.
> >2. Store streams application
> >3. Delete state.dir directory
> >4. Restart streams application
> >5. Confirm state store is initialized empty with no records restored
> >from changelog
> >
> >
> >
> > We see this problem with both in-memory and RocksDB backed state stores.
> > For persistent state store, if the streams application is restarted
> without
> > the state dir being deleted, the 

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-24 Thread Guozhang Wang
Hello Upesh,

Thanks for the detailed report. I looked through the code and tried to
reproduce the issue, but so far have not succeeded. I think I may need some
further information from you to help my further investigation.

1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
an issue, as long as it could still be bumped later (i.e. it is possible
that the restore consumer has not fetched data yet). What's key though, is
to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
be created with null value, and then been initialized once. ChangelogReader
would stop restoring once the current offset has reached beyond this value
or if this value itself is 0.

2) If `restoreEndOffset` is initialized to a non-zero value, then check if
the restoration indeed completed without applying any records, this is
determined as `hasRestoredToEnd()` returning true.

3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
top of my head I can only think of that the consumer's end offset request
gets the response with 0, indicating the changelog is now empty.


Guozhang


On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai  wrote:

> Hi all,
>
>
>
> Our team think we discovered a bug over the weekend withing the Kafka
> Streams / Processor API. We are running 2.7.0.
>
>
>
> When configuring a state store backed by a changelog topic with the
> cleanup policy configuration set to “compact,delete”:
>
>
>
> final StoreBuilder> store = Stores
>   .*keyValueStoreBuilder*(
> Stores.*persistentKeyValueStore*(*STORE_ID*),
> kSerde,
> vSerde)
>   .withLoggingEnabled(Map.*of*(
> *RETENTION_MS_CONFIG*, "9000"),
> *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>   .withCachingEnabled();
>
>
>
> Here is how we reproduced the problem:
>
>1. Records are written to the state store, and subsequently produced
>to the changelog topic.
>2. Store streams application
>3. Delete state.dir directory
>4. Restart streams application
>5. Confirm state store is initialized empty with no records restored
>from changelog
>
>
>
> We see this problem with both in-memory and RocksDB backed state stores.
> For persistent state store, if the streams application is restarted without
> the state dir being deleted, the application still does not “restore” from
> the changelog, but records are still seen in the state store.
>
>
>
> When rolling back to 2.6, we do not see this issue.
>
>
>
> Doing some debugging in the source code, in the StoreChangelogReader class
> I found that the number of records to restore is always 0 based on the
> below snippet:
>
>
>
> private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
> final ProcessorStateManager stateManager = changelogMetadata.stateManager;
> final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
> final TopicPartition partition = storeMetadata.changelogPartition();
> final String storeName = storeMetadata.store().name();
> final int numRecords = changelogMetadata.bufferedLimitIndex;
>
>
>
> Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>
>
>
> My question to you all is, 1) Is this expected behavior? 2) If not, is it
> a bug?
>
>
>
> Hope to get some clarity, and thanks in advance!
>
>
>
> Best,
> Upesh
> 
> Upesh Desai​
> Senior Software Developer
> *ude...@itrsgroup.com* 
> *www.itrsgroup.com* 
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any view
> or opinions presented are solely those of the author and do not necessarily
> represent those of the ITRS Group unless otherwise specifically stated.
> [itrs.email.signature]
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang


Re: Right Use Case For Kafka Streams?

2021-03-17 Thread Guozhang Wang
Hello Gareth,

A common practice for rolling up aggregations with Kafka Streams is to do
the finest granularity at processor (5 days in your case), and to
coarse-grained rolling up upon query serving through the interactive query
API -- i.e. whenever a query is issued for a 30 day aggregate you do a
range scan on the 5-day-aggregate stores, and compute the rollup on the fly.

If you'd prefer to still materialize all of the granularities since maybe
their query frequency is high enough, maybe just go with three stores but
as three concatenated aggregations (i.e. a stream aggregation into 5-day,s
and the 5-day table aggregation to 10days, and 10-day table aggregation to
30-days).

Guozhang

On Mon, Mar 15, 2021 at 6:11 PM Gareth Collins 
wrote:

> Hi,
>
> We have a requirement to calculate metrics on a huge number of keys (could
> be hundreds of millions, perhaps billions of keys - attempting caching on
> individual keys in many cases will have almost a 0% cache hit rate). Is
> Kafka Streams with RocksDB and compacting topics the right tool for a task
> like that?
>
> As well, just from playing with Kafka Streams for a week it feels like it
> wants to create a lot of separate stores by default (if I want to calculate
> aggregates on five, ten and 30 days I will get three separate stores by
> default for this state data). Coming from a different distributed storage
> solution, I feel like I want to put them together in one store as I/O has
> always been my bottleneck (1 big read and 1 big write is better than three
> small separate reads and three small separate writes).
>
> But am I perhaps missing something here? I don't want to avoid the DSL that
> Kafka Streams provides if I don't have to. Will the Kafka Streams RocksDB
> solution be so much faster than a distributed read that it won't be the
> bottleneck even with huge amounts of data?
>
> Any info/opinions would be greatly appreciated.
>
> thanks in advance,
> Gareth Collins
>


-- 
-- Guozhang


Re: Redis as state store

2021-03-16 Thread Guozhang Wang
Thanks!

On Mon, Mar 15, 2021 at 8:38 PM Sophie Blee-Goldman
 wrote:

> Yep, that fell off my radar.  Here we go:
> https://issues.apache.org/jira/browse/KAFKA-12475
>
> On Mon, Mar 15, 2021 at 8:09 PM Guozhang Wang  wrote:
>
> > Hey Sophie,
> >
> > Maybe we can first create a JIRA ticket for this?
> >
> > On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman
> >  wrote:
> >
> > > Sounds good! I meant anyone who is interested :)
> > >
> > > Let me know if you have any questions after digging in to this
> > >
> > > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig 
> > wrote:
> > >
> > > >  Hey Sophie, not sure if you meant me or not but I'd be happy to
> take a
> > > > stab at creating a KIP for this.  I want to spend some time digging
> > into
> > > > more of how this works first, but will then try to gather my thoughts
> > and
> > > > get something created.
> > > >
> > > > Alex
> > > >
> > > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
> > > >  wrote:
> > > >
> > > > > This certainly does seem like a flaw in the Streams API, although
> of
> > > > course
> > > > > Streams is just
> > > > > in general not designed for use with remote anything (API calls,
> > > stores,
> > > > > etc)
> > > > >
> > > > > That said, I don't see any reason why we *couldn't* have better
> > support
> > > > for
> > > > > remote state stores.
> > > > > Note that there's currently no deleteAll method on the store
> > interface,
> > > > and
> > > > > not all databases
> > > > > necessarily support that. But we could add a default implementation
> > > which
> > > > > just calls delete(key)
> > > > > on all keys in the state store, and for the RocksDB-based state
> > stores
> > > we
> > > > > still wipe out the state
> > > > > as usual (and recommend the same for any custom StateStore which is
> > > local
> > > > > rather than remote).
> > > > > Obviously falling back on individual delete(key) operations for all
> > the
> > > > > data in the entire store will
> > > > > have worse performance, but that's better than silently breaking
> EOS
> > > when
> > > > > deleteAll is not available
> > > > > on a remote store.
> > > > >
> > > > > Would you be interested in doing a KIP for this? We should also
> file
> > a
> > > > JIRA
> > > > > with the above explanation,
> > > > > so that other users of remote storage are aware of this limitation.
> > And
> > > > > definitely document this somewhere
> > > > >
> > > > >
> > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> > >  > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Alex,
> > > > > >
> > > > > > I guess wiping out the state directory is easier code-wise,
> faster,
> > > > > > and/or at the time of development the developers did not design
> for
> > > > > > remote state stores. But I do actually not know the exact reason.
> > > > > >
> > > > > > Off the top of my head, I do not know how to solve this for
> remote
> > > > state
> > > > > > stores. Using the uncaught exception handler is not good,
> because a
> > > > > > computing node could fail without giving the JVM the opportunity
> to
> > > > > > throw an exception.
> > > > > >
> > > > > > In your tests, try to increase the commit interval to a high
> value
> > > and
> > > > > > see if you get inconsistencies. You should get an inconsistency
> if
> > > the
> > > > > > state store maintains counts for keys and after the last commit
> > > before
> > > > > > the failure, the Streams app puts an event with a new key K with
> > > value
> > > > 1
> > > > > > into the state store. After failover, Streams would put the same
> > > event
> > > > > > with key K again into the state store. If the state store deleted
> > all
> > > > of
> > > > > > its data, Streams would 

Re: [VOTE] 2.6.2 RC0

2021-03-15 Thread Guozhang Wang
Hi Sophie,

I've reviewed the javadocs / release notes / documentations, and they LGTM.

+1.


Guozhang

On Fri, Mar 12, 2021 at 10:48 AM Sophie Blee-Goldman
 wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for release of Apache Kafka 2.6.2.
>
> Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the 2.6.1
> release. Please see the release notes for more information.
>
> Release notes for the 2.6.2 release:
> https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Friday, March 19th, 9am PST
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/
>
> * Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag:
> https://github.com/apache/kafka/releases/tag/2.6.2-rc0
>
> * Documentation:
> https://kafka.apache.org/26/documentation.html
>
> * Protocol:
> https://kafka.apache.org/26/protocol.html
>
> * Successful Jenkins builds for the 2.6 branch:
> Unit/integration tests:
> https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/
> System tests:
> https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/
>
> /**
>
> Thanks,
> Sophie
>


-- 
-- Guozhang


Re: Redis as state store

2021-03-15 Thread Guozhang Wang
and
> > > > was
> > > > > testing it with EOS enabled.  (forcing crashes to occur and
> checking
> > > that
> > > > > the result of my aggregation was still accurate)  I was unable to
> > cause
> > > > > inconsistent data in the mongo store (which is good!), though of
> > > course I
> > > > > may just have been getting lucky.  Thanks again for your help,
> > > > >
> > > > > Alex
> > > > >
> > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> pdeole2...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Bruno,
> > > > >>
> > > > >> i tried to explain this in 'kafka user's language through above
> > > > mentioned
> > > > >> scenario, hope i put it properly -:) and correct me if i am wrong
> > > > >>
> > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <
> pdeole2...@gmail.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >>> This is what I understand could be the issue with external state
> > > store:
> > > > >>>
> > > > >>> kafka stream application consumes source topic, does processing,
> > > stores
> > > > >>> state to kafka state store (this is backed by topic) and before
> > > > producing
> > > > >>> event on destination topic, the application fails with some
> issue.
> > In
> > > > >>> this case, the transaction has failed, so kafka guarantees either
> > all
> > > > or
> > > > >>> none, means offset written to source topic, state written to
> state
> > > > store
> > > > >>> topic, output produced on destination topic... all of these
> happen
> > or
> > > > >> none
> > > > >>> of these and in this failure scenario it is none of these.
> > > > >>>
> > > > >>> Assume you have redis state store, and you updated the state into
> > > redis
> > > > >>> and stream application failed. Now, you have source topic and
> > > > destination
> > > > >>> topic consistent i.e. offset is not committed to source topic and
> > > > output
> > > > >>> not produced on destination topic, but you redis state store is
> > > > >>> inconsistent with that since it is external state store and kafka
> > > can't
> > > > >>> guarantee rollback ot state written there
> > > > >>>
> > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <
> alexcrai...@gmail.com>
> > > > >> wrote:
> > > > >>>
> > > > >>>> " Another issue with 3rd party state stores could be violation
> of
> > > > >>>> exactly-once guarantee provided by kafka streams in the event
> of a
> > > > >> failure
> > > > >>>> of streams application instance"
> > > > >>>>
> > > > >>>> I've heard this before but would love to know more about how a
> > > custom
> > > > >>>> state
> > > > >>>> store would be at any greater risk than RocksDB as far as
> > > exactly-once
> > > > >>>> guarantees are concerned.  They all implement the same
> interface,
> > so
> > > > as
> > > > >>>> long as you're correctly implementing get(), put(), delete(),
> > > flush(),
> > > > >>>> etc,
> > > > >>>> you should be fine right?  In other words, I don't think there
> is
> > > any
> > > > >>>> special "exactly once magic" that is baked into the RocksDB
> store
> > > > >> code.  I
> > > > >>>> could be wrong though so I'd love to hear people's thoughts,
> > thanks,
> > > > >>>>
> > > > >>>> Alex C
> > > > >>>>
> > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <
> > > > mpart...@hpe.com>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Thanks for the responses. In the worst case, I might have to
> keep
> > > > both
> > > > >&g

Re: Redis as state store

2021-03-12 Thread Guozhang Wang
Hello Mohan,

I think what you had in mind works with Redis, since it is a remote state
store engine, it does not have the co-partitioning requirements as local
state stores.

One thing you'd need to tune KS though is that with remote stores, the
processing latency may be larger, and since Kafka Streams process all
records of a single partition in order, synchronously, you may need to tune
the poll interval configs etc to make sure KS would stay in the consumer
group and not trigger unnecessary rebalances.

Guozhang

On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan 
wrote:

> Hi,
>
> I have a use case where messages come in with some key gets assigned some
> partition and the state gets created. Later, key changes (but still
> contains the old key in the message) and gets sent to a different
> partition. I want to be able to grab the old state using the old key before
> creating the new state on this instance. Redis as a  state store makes it
> easy to implement this where I can simply do a lookup before creating the
> state. I see an implementation here :
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
>
> Has anyone tried this ? Any caveats.
>
> Thanks
> Mohan
>
>

-- 
-- Guozhang


Re: Abort transaction semantics

2021-03-11 Thread Guozhang Wang
Hello Peter,

The resetToLastCommittedPositions is for cases where the app can still up
and running, as a convenient way to continue the processing; if the app has
crashed, then upon restarting it would try to reset its position by reading
the committed offsets, which would be corresponding to the last succeeded
transaction and hence would not include the last failed txn right before
the crash.

Or you can think about that in another way: whenever the transaction need
to be aborted, instead of following the example tutorial we provide, one
can also pursue another more-extreme way which is "always system.exit(1)
immediately, and then try to restart the app" (:P) Which would basically
fall to the crash scenario as well and is still correct --- you just pay
more perf hit as crashing and restarting the app.


Guozhang


On Tue, Mar 9, 2021 at 3:06 AM Peter Cipov 
wrote:

> Hello
>
> Thanks for advice, I have looked into examples and spotted yet another
> place that is not clear to me. It is resetToLastCommittedPositions (in case
> of abort). I am wondering what will happen when this method crashes, so it
> will not reset consumer to latest committed offsets. Let's say the process
> is killed in that instant. In my case this will schedule a new process that
> creates a new consumer (assume rebalance will assign him the same
> partitions, nobody else touched them). In such cases the offsets will be
> shifted or not ?
>
> I am wondering where is the place in the transaction schema where consumer
> offsets are committed to kafka.
>
> Thank you
>
> Peter
>
> On Fri, Feb 19, 2021 at 9:23 PM Guozhang Wang  wrote:
>
> > Hello Peter,
> >
> > Note that when you upgrade from 2.4 to later versions in Kafka, your
> error
> > handling could be modified and simplified a bit as well. You can read the
> > example code in KIP-691 as a reference:
> >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-691*3A*Enhance*Transactional*Producer*Exception*Handling__;JSsrKysr!!NCc8flgU!IcJMrFZbaTy6WKg7C9KUzh2MsRne_VKrStjpn4MNlzSaDi5-5VUV2NIntQ7pDQ$
> >
> >
> > Guozhang
> >
> > On Fri, Feb 19, 2021 at 1:23 AM Peter Cipov 
> > wrote:
> >
> > > This was really helpful.
> > >
> > > Thank you
> > >
> > > On Thu, Feb 18, 2021 at 8:08 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the question. I think Gary provided an excellent answer.
> > > > Additionally, you could check out the code example
> > > > <
> > > >
> > >
> >
> https://urldefense.com/v3/__https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java*L132__;Iw!!NCc8flgU!LgslseBAQdNQtCsX-YGUPoAM1BQ4jZYO6p5X9Tjnzow_pVJrKZA9dJkdRzqerg$
> > > > >
> > > > for EOS, which shows you how to reset the state while aborting
> ongoing
> > > > transactions.
> > > >
> > > > On Thu, Feb 18, 2021 at 11:01 AM Gary Russell 
> > > wrote:
> > > >
> > > > > You have to perform seeks (using the consumer) to the lowest
> > > unprocessed
> > > > > offset for each partition returned by the poll, before the next
> poll.
> > > > > 
> > > > > From: Peter Cipov 
> > > > > Sent: Thursday, February 18, 2021 1:20 PM
> > > > > To: users@kafka.apache.org 
> > > > > Subject: Abort transaction semantics
> > > > >
> > > > > Hello
> > > > > I have a question regarding aborting transactions in kafka client
> > > 2.4.1.
> > > > >
> > > > > lets have following code :
> > > > >
> > > > > ... propper transaction producer consumer creation, consumer
> > > autocommit =
> > > > > false
> > > > >
> > > > > producer.transactionInit();
> > > > >
> > > > > while(true) {
> > > > >   records = consumer.poll();
> > > > >   logRecordOffsets(records)
> > > > >   producer.beginTransaction()
> > > > >   try {
> > > > >doMagic()
> > > > >   } catch{
> > > > > producer.AbortTransaction();
> > > > > continue;
> > > > >   }
> > > > >   producer.sendOffsets(..);
> > > > >   producer.commitTransaction()
> > > > > }
> > > > >
> > > > > When doMagic crashes for some reason, abort is called and code will
> > > start
> > > > > from beginning with doing poll.
> > > > >
> > > > > Our assumption was that the next poll will start from the same
> > offsets,
> > > > but
> > > > > as we saw from logs this is not the case. What we observed that
> > offsets
> > > > are
> > > > > shifted and messages are lost, they will not be retried again.
> > > > >
> > > > > What is the semantics for abort, we could not figure out from
> > > > > documentation.
> > > > > What is the recommended approach for retrying ?
> > > > >
> > > > > Thank you
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: Window Store

2021-02-23 Thread Guozhang Wang
Sorry I was not very clear before: by "WindowStore" I meant implementing
your own customized store based on a kvStore where the key is a combo
. Note you put timestamp first then key in your
serialization format, so that you can range-fetch with just the prefix on
timestamp then. In fact `WindowStore` that we provide is also following
this design principle, but it's combo key is in  so range
fetch is not as efficient since you'd need to fetch a much larger range and
then filter a lot of records.


Guozhang

On Tue, Feb 23, 2021 at 4:04 PM Navneeth Krishnan 
wrote:

> Thanks Guozhang.
>
> I don't see the remove method in window stores. Am I missing something? It
> would be very nice to implement the optimization you had mentioned.
>
> Thanks
>
> On Tue, Feb 23, 2021 at 11:11 AM Guozhang Wang  wrote:
>
> > I see. In that case I think your design with a KVstore plus a
> book-keeping
> > window store would work better. One minor optimization you can try
> though,
> > is that instead of trying to check if the TTL has changed or not when
> > expiring from the window store, you can try to delete from the window
> store
> > whenever you are updating the kv-store. More specifically, when you
> update
> > the kv-store, do sth. like this:
> >
> > value = kvStore.get(k);  // here value also encodes the timestamp, e.g.
> see
> > "TimestampedKeyValueStore" interface
> > if (value != v)
> >   // v is the new value you want to put
> >   windowStore.remove(combo-key); // here the combo-key is a  > key> where timestamp is extracted from value
> >
> > kvStore.put(k, v)
> > kvStore.put(combo-key);  // it is in 
> >
> > Later when you expire, you do not need to check on kvStore if the value's
> > timestamp has changed or not.
> >
> >
> >
> >
> > On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan <
> > reachnavnee...@gmail.com>
> > wrote:
> >
> > > Thanks Liam & Guozhang.
> > >
> > > First of all, we use PAPI in our entire topology and we would like to
> > > retain it that way rather than combining with DSL. Secondly, even I was
> > > more leaning towards session store but the problem I found with session
> > > store is I cannot get all the expired sessions without keys where as
> > > windowstore has the option to get all keys by range. Ideally I would
> like
> > > to have a punctuate function which finds all the expired records and
> send
> > > it to downstream. I looked at KStreamSessionWindowAggregate but it
> looks
> > > like we need a new value coming in for the key to even send updates. In
> > my
> > > case there might not be any activity at all but I still need to send
> the
> > > delete event.
> > >
> > > Here is how we want it to work
> > > T -> User1 (Active event)
> > > T+5 -> User1 (Active event)
> > > T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
> > > period)
> > >
> > > Thanks
> > >
> > > On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hello Navneeth,
> > > >
> > > > I would agree with Liam that a session store seems a good fit for
> your
> > > > case. But note that session stores would not expire a session
> > themselves
> > > > and it is still the processor node's job to find those already
> expired
> > > > sessions and emit results / delete. You can take a look at
> > > > the KStreamSessionWindowAggregate inside Kafka code base (
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> > > > )
> > > > for a reference.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> > > > liam.cla...@adscale.co.nz> wrote:
> > > >
> > > > > Hmmm, thanks Navneeth,
> > > > >
> > > > > I feel like a session store set to an inactivity period of 10
> > minutes,
> > > > > suppressed until session window closed, combined with a
> GlobalKTable
> > > > would
> > > > > be how I'd start to approach this in the DSL, with the below
> > topology.
> > > I
> > > > > have no idea if my ASCII art below will survive email formatting,
> so
> > > I'll
> > > > 

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

2021-02-23 Thread Guozhang Wang
Is it possible that the flattened values contain `null` and hence `_.split`
throws?

On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert  wrote:

> Hey, missed your replay - but the code i've shared above the logs is the
> code around those lines (removed some identifiers to make it a little bit
> more generic):
>
> > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
> > ${v.printForDebug}")}) # return type KStream[Windowed[String],
> > SingleInputMessage]
>
>
> On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang  wrote:
>
> > Could you share your code around
> >
> > >
> >
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >
> > That seems to be where NPE is thrown.
> >
> >
> > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert 
> wrote:
> >
> > > Hey,
> > > *Without any code change*, just by bumping the kafka version from 2.5.1
> > to
> > > 2.6.1 (clients only) - my stream application started throwing
> > > NullPointerException (sometimes, not in a predicted pattern).
> > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf
> > > that was forgotten there from the older versions.
> > >
> > > We are using Scala 2.12, and the line that throws this exception is
> using
> > > flatMapValues:
> > >
> > >
> > > >  inputStream.flatMapValues(_.split) # return type
> > > > KStream[Windowed[String], SingleInputMessage]
> > >
> > >
> > > Where inputStream is of type: KStream[Windowed[String], InputMessage]
> and
> > > the split method splits this InputMessage into several
> > > SingleInputMessage messages (hence the flat - to avoid
> > > List[SingleInputMessage]).
> > >
> > > The exception:
> > >
> > > > java.lang.NullPointerException: null Wrapped by:
> > > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > > process. taskId=2_2,
> processor=unique_input_message-repartition-source,
> > > > topic=service-unique_input_message-repartition, partition=2,
> > > > offset=318846738, stacktrace=java.lang.NullPointerException
> > > >
> > >
> > > java.lang.NullPointerException: null at
> > > >
> > >
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams

Re: Window Store

2021-02-23 Thread Guozhang Wang
I see. In that case I think your design with a KVstore plus a book-keeping
window store would work better. One minor optimization you can try though,
is that instead of trying to check if the TTL has changed or not when
expiring from the window store, you can try to delete from the window store
whenever you are updating the kv-store. More specifically, when you update
the kv-store, do sth. like this:

value = kvStore.get(k);  // here value also encodes the timestamp, e.g. see
"TimestampedKeyValueStore" interface
if (value != v)
  // v is the new value you want to put
  windowStore.remove(combo-key); // here the combo-key is a  where timestamp is extracted from value

kvStore.put(k, v)
kvStore.put(combo-key);  // it is in 

Later when you expire, you do not need to check on kvStore if the value's
timestamp has changed or not.




On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan 
wrote:

> Thanks Liam & Guozhang.
>
> First of all, we use PAPI in our entire topology and we would like to
> retain it that way rather than combining with DSL. Secondly, even I was
> more leaning towards session store but the problem I found with session
> store is I cannot get all the expired sessions without keys where as
> windowstore has the option to get all keys by range. Ideally I would like
> to have a punctuate function which finds all the expired records and send
> it to downstream. I looked at KStreamSessionWindowAggregate but it looks
> like we need a new value coming in for the key to even send updates. In my
> case there might not be any activity at all but I still need to send the
> delete event.
>
> Here is how we want it to work
> T -> User1 (Active event)
> T+5 -> User1 (Active event)
> T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
> period)
>
> Thanks
>
> On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang  wrote:
>
> > Hello Navneeth,
> >
> > I would agree with Liam that a session store seems a good fit for your
> > case. But note that session stores would not expire a session themselves
> > and it is still the processor node's job to find those already expired
> > sessions and emit results / delete. You can take a look at
> > the KStreamSessionWindowAggregate inside Kafka code base (
> >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> > )
> > for a reference.
> >
> >
> > Guozhang
> >
> > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hmmm, thanks Navneeth,
> > >
> > > I feel like a session store set to an inactivity period of 10 minutes,
> > > suppressed until session window closed, combined with a GlobalKTable
> > would
> > > be how I'd start to approach this in the DSL, with the below topology.
> I
> > > have no idea if my ASCII art below will survive email formatting, so
> I'll
> > > try to explain. User ids stream into the GlobalKTable, and also into
> the
> > > session store. After 10 minutes of inactivity for a given user id key,
> > the
> > > session expires, and the session store emits the user_id -> some_value.
> > I'd
> > > then map the some_value to null, to take advantage of KTable semantics
> > > where `k -> null` is treated as a delete for key k, so an inactive user
> > > would be deleted from the ktable. You could then periodically query the
> > > ktable's key-value store for outside emission.
> > >
> > > That said, this is only how I'd start to explore the problem, and there
> > are
> > > obvious questions that need to be answered first like how much state
> > would
> > > you end up storing in the session store, etc. I'm hoping someone like
> > John
> > > Roesler who has far better insights into Kafka Streams might weigh in
> > here.
> > >
> > >
> > > user ids -->
> > > globalktable < keyValueStore periodically queried.
> > >   \> session store > map (user_id -> null) --/
> > >
> > > Good luck,
> > >
> > > Liam
> > >
> > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> > > reachnavnee...@gmail.com>
> > > wrote:
> > >
> > > > Hi Liam,
> > > >
> > > > The use case is stream all data and send it to storage after
> > processing.
> > > > Also when the user is inactive for a 10 min period then send a
> special
> &g

Re: Abort transaction semantics

2021-02-19 Thread Guozhang Wang
Hello Peter,

Note that when you upgrade from 2.4 to later versions in Kafka, your error
handling could be modified and simplified a bit as well. You can read the
example code in KIP-691 as a reference:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling


Guozhang

On Fri, Feb 19, 2021 at 1:23 AM Peter Cipov 
wrote:

> This was really helpful.
>
> Thank you
>
> On Thu, Feb 18, 2021 at 8:08 PM Boyang Chen 
> wrote:
>
> > Thanks for the question. I think Gary provided an excellent answer.
> > Additionally, you could check out the code example
> > <
> >
> https://urldefense.com/v3/__https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java*L132__;Iw!!NCc8flgU!LgslseBAQdNQtCsX-YGUPoAM1BQ4jZYO6p5X9Tjnzow_pVJrKZA9dJkdRzqerg$
> > >
> > for EOS, which shows you how to reset the state while aborting ongoing
> > transactions.
> >
> > On Thu, Feb 18, 2021 at 11:01 AM Gary Russell 
> wrote:
> >
> > > You have to perform seeks (using the consumer) to the lowest
> unprocessed
> > > offset for each partition returned by the poll, before the next poll.
> > > 
> > > From: Peter Cipov 
> > > Sent: Thursday, February 18, 2021 1:20 PM
> > > To: users@kafka.apache.org 
> > > Subject: Abort transaction semantics
> > >
> > > Hello
> > > I have a question regarding aborting transactions in kafka client
> 2.4.1.
> > >
> > > lets have following code :
> > >
> > > ... propper transaction producer consumer creation, consumer
> autocommit =
> > > false
> > >
> > > producer.transactionInit();
> > >
> > > while(true) {
> > >   records = consumer.poll();
> > >   logRecordOffsets(records)
> > >   producer.beginTransaction()
> > >   try {
> > >doMagic()
> > >   } catch{
> > > producer.AbortTransaction();
> > > continue;
> > >   }
> > >   producer.sendOffsets(..);
> > >   producer.commitTransaction()
> > > }
> > >
> > > When doMagic crashes for some reason, abort is called and code will
> start
> > > from beginning with doing poll.
> > >
> > > Our assumption was that the next poll will start from the same offsets,
> > but
> > > as we saw from logs this is not the case. What we observed that offsets
> > are
> > > shifted and messages are lost, they will not be retried again.
> > >
> > > What is the semantics for abort, we could not figure out from
> > > documentation.
> > > What is the recommended approach for retrying ?
> > >
> > > Thank you
> > >
> >
>


-- 
-- Guozhang


Re: Window Store

2021-02-19 Thread Guozhang Wang
Hello Navneeth,

I would agree with Liam that a session store seems a good fit for your
case. But note that session stores would not expire a session themselves
and it is still the processor node's job to find those already expired
sessions and emit results / delete. You can take a look at
the KStreamSessionWindowAggregate inside Kafka code base (
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java)
for a reference.


Guozhang

On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Hmmm, thanks Navneeth,
>
> I feel like a session store set to an inactivity period of 10 minutes,
> suppressed until session window closed, combined with a GlobalKTable would
> be how I'd start to approach this in the DSL, with the below topology. I
> have no idea if my ASCII art below will survive email formatting, so I'll
> try to explain. User ids stream into the GlobalKTable, and also into the
> session store. After 10 minutes of inactivity for a given user id key, the
> session expires, and the session store emits the user_id -> some_value. I'd
> then map the some_value to null, to take advantage of KTable semantics
> where `k -> null` is treated as a delete for key k, so an inactive user
> would be deleted from the ktable. You could then periodically query the
> ktable's key-value store for outside emission.
>
> That said, this is only how I'd start to explore the problem, and there are
> obvious questions that need to be answered first like how much state would
> you end up storing in the session store, etc. I'm hoping someone like John
> Roesler who has far better insights into Kafka Streams might weigh in here.
>
>
> user ids -->
> globalktable < keyValueStore periodically queried.
>   \> session store > map (user_id -> null) --/
>
> Good luck,
>
> Liam
>
> On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> reachnavnee...@gmail.com>
> wrote:
>
> > Hi Liam,
> >
> > The use case is stream all data and send it to storage after processing.
> > Also when the user is inactive for a 10 min period then send a special
> > event that marks the user as inactive. I'm trying to implement the
> special
> > event here.
> >
> > Thanks
> >
> >
> > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hey Navneeth,
> > >
> > > So to understand your problem better - do you only want to stream users
> > > active within 10 minutes to storage?
> > >
> > > Cheers,
> > >
> > > Liam
> > >
> > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > reachnavnee...@gmail.com>
> > > wrote:
> > >
> > > > It’s just for emitting to data storage. There is no join here.
> > > >
> > > > Thanks
> > > >
> > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > liam.cla...@adscale.co.nz> wrote:
> > > >
> > > > > Hi Navneeth,
> > > > >
> > > > > What is the purpose of holding these user records? Is it to join
> > > against
> > > > > other streams, or emit to data storage?
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Liam Clarke-Hutchinson
> > > > >
> > > > >
> > > > >
> > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > reachnavnee...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I have a question about how I can use window stores to achieve
> this
> > > use
> > > > > > case. Thanks for all the help.
> > > > > >
> > > > > > A user record will be created when the user first logins and the
> > > > records
> > > > > > needs to be cleaned up after 10 mins of inactivity. Thus for each
> > > user
> > > > > > there will be a TTL but the TTL value will be updated each time
> > when
> > > > the
> > > > > > user is active before he becomes inactive for the entire 10 min
> > > period.
> > > > > We
> > > > > > are currently using PAPI for all our topologies and I was
> thinking
> > of
> > > > > > implementing it using a punctuator.
> > > > > >
> > > > > > My initial logic was to have a KV store with each user as key and
> > TTL
> > > > as
> > > > > > the value and run a scheduled task every minute that looks at all
> > the
> > > > > > records which have TTL value lesser than the timestamp. But the
> > > problem
> > > > > in
> > > > > > this approach was performance. When there are more than 1M
> records
> > it
> > > > > takes
> > > > > > more than a few seconds to complete this task.
> > > > > >
> > > > > > Next approach is to have a window store and a KV store. Window
> > store
> > > > will
> > > > > > have each user and corresponding TTL rounded to the nearest
> minute.
> > > > Then
> > > > > > find all keys between the current time and current time - 1min.
> > Then
> > > > > > iterate these keys and use the KV store to find if the TTL value
> is
> > > > still
> > > > > > the same or if we have received any updates after that. 

Re: Kafka 2.7.0 processor API and streams-test-utils changes

2021-02-05 Thread Guozhang Wang
Hello Upesh,

Thanks for the report! I think this is overlooked to update the
documentation with the new 2.7.0 release. Could you file a JIRA (or even
better, provide a PR with the JIRA :) to update the docs?


Guozhang

On Thu, Feb 4, 2021 at 1:03 PM Upesh Desai  wrote:

> Hello,
>
>
>
> I recently upgraded our Kafka components to 2.7.0, and noticed the changes
> to the processor API. Specifically, the additions of:
>
>
>
>- org.apache.kafka.streams.processor.api.Processor
>- org.apache.kafka.streams.processor.api.ProcessorContext
>
>
>
> The old Topology.addProcessor() method has been deprecated, which is what
> led me to finding the new classes. After porting our code to the updated
> processor API, we noticed issues with the Processor unit tests, which had
> been written follow this documentation exactly:
>
>
>
>
> https://kafka.apache.org/27/documentation/streams/developer-guide/testing.html#unit-testing-processors
>
>
>
> However, it seems that the MockProcessorContext and possibly other test
> suite classes have not been updated for the new API changes, such as the
> following methods:
>
>
>
> store.init(context, store);
>
> context.register(store, null);
>
>
>
> Can someone point me in the right direction if this has indeed been
> changed/fixed or need to raise an issue to have this updated in the next
> release?
>
>
>
> Cheers!
> 
> Upesh Desai​
> Senior Software Developer
> *ude...@itrsgroup.com* 
> *www.itrsgroup.com* 
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any view
> or opinions presented are solely those of the author and do not necessarily
> represent those of the ITRS Group unless otherwise specifically stated.
> [itrs.email.signature]
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang


Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

2021-01-28 Thread Guozhang Wang
Could you share your code around

>
com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)

That seems to be where NPE is thrown.


On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert  wrote:

> Hey,
> *Without any code change*, just by bumping the kafka version from 2.5.1 to
> 2.6.1 (clients only) - my stream application started throwing
> NullPointerException (sometimes, not in a predicted pattern).
> Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf
> that was forgotten there from the older versions.
>
> We are using Scala 2.12, and the line that throws this exception is using
> flatMapValues:
>
>
> >  inputStream.flatMapValues(_.split) # return type
> > KStream[Windowed[String], SingleInputMessage]
>
>
> Where inputStream is of type: KStream[Windowed[String], InputMessage] and
> the split method splits this InputMessage into several
> SingleInputMessage messages (hence the flat - to avoid
> List[SingleInputMessage]).
>
> The exception:
>
> > java.lang.NullPointerException: null Wrapped by:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=2_2, processor=unique_input_message-repartition-source,
> > topic=service-unique_input_message-repartition, partition=2,
> > offset=318846738, stacktrace=java.lang.NullPointerException
> >
>
> java.lang.NullPointerException: null at
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > at
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > at
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> 

Re: Are Kafka and Kafka Streams right tools for my case?

2021-01-19 Thread Guozhang Wang
Hello,

I have observed several use cases similar to yours that are using Kafka /
Kafka Streams in production. That being said, your concerns are valid:

* Big messages: 5MB is indeed large, but not extremely big for Kafka. If it
is a single message of hundreds of MBs or over a GB then it's a bit
different handling story (and you may need to consider chunking it). You
would still need to make sure Kafka is configured right with
max.message.size etc to handle them, also you may need to tune your clients
on network sockets (like the buffer size, etc) for optimal networking
performance.

* External call in Streams: if the external service may be unavailable,
then your implementation should have a timeout scenario to either drop the
record or retry it later (some implementations would put it into a retry
queue, again stored as a Kafka topic, and then read from it later to
retry). Also note that Kafka Streams rely on consumer to poll the records,
and if that `poll` call is not triggered in time because of the external
API calls taking too long, you'd need to configure the poll.interval long
enough for this. Another caveat I can think of is that Kafka Streams at the
moment do not have async-processing capabilities, i.e. if a single record
taking too long for external call (or simply local IO call), then it would
block all records after it --- so if processing capability bottleneck is a
common case for you, you'd probably need to consider writing a custom
processor for async external calls yourself. In the future we do have plans
to support async processing in Kafka Streams though.



Guozhang




On Tue, Jan 19, 2021 at 8:44 AM The Real Preacher  wrote:

> I'm new to Kafka and will be grateful for any advice We are updating a
> legacy application together with moving it from IBM MQ to something
> different.
>
>
> Application currently does the following:
>
>   * Reads batch XML messages (up to 5 MB)
>   * Parses it to something meaningful
>   * Processes data parallelizing this procedure somehow manually for
> parts of the batch. Involves some external legacy API calls
> resulting in DB changes
>   * Sends several kinds of email notifications
>   * Sends some reply to some other queue
>   * input messages are profiled to disk
>
>
> We are considering using Kafka with Kafka Streams as it is nice to
>
>   * Scale processing easily
>   * Have messages persistently stored out of the box
>   * Built-in partitioning, replication, and fault-tolerance
>   * Confluent Schema Registry to let us move to schema-on-write
>   * Can be used for service-to-service communication for other
> applications as well
>
>
> But I have some concerns.
>
>
> We are thinking about splitting those huge messages logically and
> putting them to Kafka this way, as from how I understand it - Kafka is
> not a huge fan of big messages. Also it will let us parallelize
> processing on partition basis.
>
>
> After that use Kafka Streams for actual processing and further on for
> aggregating some batch responses back using state store. Also to push
> some messages to some other topics (e.g. for sending emails)
>
>
> But I wonder if it is a good idea to do actual processing in Kafka
> Streams at all, as it involves some external API calls?
>
>
> Also I'm not sure what is the best way to handle the cases when this
> external API is down for any reason. It means temporary failure for
> current and all the subsequent messages. Is there any way to stop Kafka
> Stream processing for some time? I can see that there are Pause and
> Resume methods on the Consumer API, can they be utilized somehow in
> Streams?
>
>
> Is it better to use a regular Kafka consumer here, possibly adding
> Streams as a next step to merge those batch messages together? Sounds
> like an overcomplication
>
>
> Is Kafka a good tool for these purposes at all?
>
> Cheers,
> TRP
>
>

-- 
-- Guozhang


Re: [ANNOUNCE] Apache Kafka 2.7.0

2020-12-21 Thread Guozhang Wang
Thank you Bill !

Congratulations to the community.

On Mon, Dec 21, 2020 at 4:08 PM Randall Hauch  wrote:

> Fantastic! Thanks for driving the release, Bill.
>
> Congratulations to the whole Kafka community.
>
> On Mon, Dec 21, 2020 at 5:55 PM Gwen Shapira  wrote:
>
> > woooh!!!
> >
> > Great job on the release Bill and everyone!
> >
> > On Mon, Dec 21, 2020 at 8:01 AM Bill Bejeck  wrote:
> > >
> > > The Apache Kafka community is pleased to announce the release for
> Apache
> > > Kafka 2.7.0
> > >
> > > * Configurable TCP connection timeout and improve the initial metadata
> > fetch
> > > * Enforce broker-wide and per-listener connection creation rate
> (KIP-612,
> > > part 1)
> > > * Throttle Create Topic, Create Partition and Delete Topic Operations
> > > * Add TRACE-level end-to-end latency metrics to Streams
> > > * Add Broker-side SCRAM Config API
> > > * Support PEM format for SSL certificates and private key
> > > * Add RocksDB Memory Consumption to RocksDB Metrics
> > > * Add Sliding-Window support for Aggregations
> > >
> > > This release also includes a few other features, 53 improvements, and
> 91
> > > bug fixes.
> > >
> > > All of the changes in this release can be found in the release notes:
> > > https://www.apache.org/dist/kafka/2.7.0/RELEASE_NOTES.html
> > >
> > > You can read about some of the more prominent changes in the Apache
> Kafka
> > > blog:
> > > https://blogs.apache.org/kafka/entry/what-s-new-in-apache4
> > >
> > > You can download the source and binary release (Scala 2.12, 2.13) from:
> > > https://kafka.apache.org/downloads#2.7.0
> > >
> > >
> >
> ---
> > >
> > >
> > > Apache Kafka is a distributed streaming platform with four core APIs:
> > >
> > >
> > > ** The Producer API allows an application to publish a stream records
> to
> > > one or more Kafka topics.
> > >
> > > ** The Consumer API allows an application to subscribe to one or more
> > > topics and process the stream of records produced to them.
> > >
> > > ** The Streams API allows an application to act as a stream processor,
> > > consuming an input stream from one or more topics and producing an
> > > output stream to one or more output topics, effectively transforming
> the
> > > input streams to output streams.
> > >
> > > ** The Connector API allows building and running reusable producers or
> > > consumers that connect Kafka topics to existing applications or data
> > > systems. For example, a connector to a relational database might
> > > capture every change to a table.
> > >
> > >
> > > With these APIs, Kafka can be used for two broad classes of
> application:
> > >
> > > ** Building real-time streaming data pipelines that reliably get data
> > > between systems or applications.
> > >
> > > ** Building real-time streaming applications that transform or react
> > > to the streams of data.
> > >
> > >
> > > Apache Kafka is in use at large and small companies worldwide,
> including
> > > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest,
> Rabobank,
> > > Target, The New York Times, Uber, Yelp, and Zalando, among others.
> > >
> > > A big thank you for the following 117 contributors to this release!
> > >
> > > A. Sophie Blee-Goldman, Aakash Shah, Adam Bellemare, Adem Efe Gencer,
> > > albert02lowis, Alex Diachenko, Andras Katona, Andre Araujo, Andrew
> Choi,
> > > Andrew Egelhofer, Andy Coates, Ankit Kumar, Anna Povzner, Antony
> Stubbs,
> > > Arjun Satish, Ashish Roy, Auston, Badai Aqrandista, Benoit Maggi, bill,
> > > Bill Bejeck, Bob Barrett, Boyang Chen, Brian Byrne, Bruno Cadonna, Can
> > > Cecen, Cheng Tan, Chia-Ping Tsai, Chris Egerton, Colin Patrick McCabe,
> > > David Arthur, David Jacot, David Mao, Dhruvil Shah, Dima Reznik,
> Edoardo
> > > Comar, Ego, Evelyn Bayes, feyman2016, Gal Margalit, gnkoshelev, Gokul
> > > Srinivas, Gonzalo Muñoz, Greg Harris, Guozhang Wang, high.lee,
> > huangyiming,
> > > huxi, Igor Soarez, Ismael Juma, Ivan Yurchenko, Jason Gustafson, Jeff
> > Kim,
> > > jeff kim, Jesse Gorzinski, jiameixie, Jim Galasyn, JoelWee, John
> Roesler,
> > > John Thomas, Jorge Esteban Quil

Re: Handling "uneven" network partitions

2020-12-14 Thread Guozhang Wang
Hello Stig,

I think there's an ordering of the events here, e.g.:

T0: network partition happens.
T1: record-1 received at the leader, at this time the ISR is still 3.
Leader will accept this record and wait for it to be replicated.
T100: lafter some elapsed time the leader decides to kick other followers
out of ISR.
T101: record-2 received at leader, at this time the leader will reject the
produce request with not-enough-replica error code.

So that you see before T100, records reached on the leader are still being
accepted. Note that min.isr / acks configurations would not impact how the
ISR itself being managed.

Guozhang



On Fri, Dec 11, 2020 at 5:34 AM Stig Rohde Døssing 
wrote:

>  Hi,
>
> We have a topic with min.insync.replicas = 2 where each partition is
> replicated to 3 nodes. We write to it using acks=all.
>
> We experienced a network malfunction, where leader node 1 could not reach
> replica 2 and 3, and vice versa. Nodes 2 and 3 could reach each other. The
> controller broker could reach all nodes, and external services could reach
> all nodes.
>
> What we saw was the ISR degrade to only node 1. Looking at the code, I see
> the ISR shrink when a replica has not caught up to the leader's LEO and it
> hasn't fetched for a while. My guess is the leader had messages that
> weren't yet replicated by the other nodes.
>
> Shouldn't min.insync.replicas = 2 and acks=all prevent the ISR shrinking to
> this size, since new writes should not be accepted unless they are
> replicated by at least 2 nodes?
>


-- 
-- Guozhang


Re: [VOTE] 2.7.0 RC5

2020-12-14 Thread Guozhang Wang
I checked the docs and ran unit tests, no red flags found. +1.

On Fri, Dec 11, 2020 at 5:45 AM Bill Bejeck  wrote:

> Updated with link to successful Jenkins build.
>
> * Successful Jenkins builds for the 2.7 branch:
>  Unit/integration tests:
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-2.7-jdk8/detail/kafka-2.7-jdk8/78/
>
> On Thu, Dec 10, 2020 at 5:17 PM Bill Bejeck  wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the sixth candidate for release of Apache Kafka 2.7.0.
> >
> > * Configurable TCP connection timeout and improve the initial metadata
> > fetch
> > * Enforce broker-wide and per-listener connection creation rate (KIP-612,
> > part 1)
> > * Throttle Create Topic, Create Partition and Delete Topic Operations
> > * Add TRACE-level end-to-end latency metrics to Streams
> > * Add Broker-side SCRAM Config API
> > * Support PEM format for SSL certificates and private key
> > * Add RocksDB Memory Consumption to RocksDB Metrics
> > * Add Sliding-Window support for Aggregations
> >
> > This release also includes a few other features, 53 improvements, and 84
> > bug fixes.
> >
> > Release notes for the 2.7.0 release:
> > https://home.apache.org/~bbejeck/kafka-2.7.0-rc5/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Friday, December 18, 12 PM ET ***
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > https://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~bbejeck/kafka-2.7.0-rc5/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * Javadoc:
> > https://home.apache.org/~bbejeck/kafka-2.7.0-rc5/javadoc/
> >
> > * Tag to be voted upon (off 2.7 branch) is the 2.7.0 tag:
> > https://github.com/apache/kafka/releases/tag/2.7.0-rc5
> >
> > * Documentation:
> > https://kafka.apache.org/27/documentation.html
> >
> > * Protocol:
> > https://kafka.apache.org/27/protocol.html
> >
> > * Successful Jenkins builds for the 2.7 branch:
> > Unit/integration tests: Link to follow
> >
> > Thanks,
> > Bill
> >
>


-- 
-- Guozhang


Re: Kafka Streams - Source Partition Assignment Issue

2020-12-14 Thread Guozhang Wang
Kafka Streams should evenly distribute the partitions, but there are some
issues in old versions of Kafka that you may be observing. To verify if it
is a transient issue or it is permanently, I'd suggest you try:

1) bounce the instances that have no partitions assigned (not bounce them
all together, but in rolling manner), and see if after the rebalance they
are evenly distributed.

2) check if the app.id are set to the same value.

Also which version of Kafka are you using?


Guozhang




On Mon, Dec 14, 2020 at 11:29 AM Navneeth Krishnan 
wrote:

> Hi All,
>
> How does kafka streams partition assignment work for sources? I have a
> stream application reading from a topic which has 24 partitions. There are
> 6 application containers with 4 stream tasks in each container running but
> only 2 instances are assigned with partitions and even within the two, one
> has 21 partitions and the other has 3 partitions assigned. There is
> something wrong in the way the partitions are assigned. Is there a way to
> evenly distribute partitions?
>
> *Group Assignment:*
> userapp user-data   5  1716495057  1716504638
>9581
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer-428625d0-da9c-44d0-b499-79e698f5add0
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer
> userapp user-data   13 1735721509  1735730754
>9245
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer-428625d0-da9c-44d0-b499-79e698f5add0
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer
> userapp user-data   9  1729975595  1729984597
>9002
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer-428625d0-da9c-44d0-b499-79e698f5add0
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer
> userapp user-data   17 1736457935  1736466997
>9062
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer-428625d0-da9c-44d0-b499-79e698f5add0
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer
> userapp user-data   0  1917953948  1917963103
>9155
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer-428625d0-da9c-44d0-b499-79e698f5add0
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-1-consumer
> userapp user-data   11 1719082482  1719085979
>3497
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer-a97f7c1f-f4a1-4b39-bd80-9b8c18dfd62a
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer
> userapp user-data   19 1755067810  1755071386
>3576
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer-a97f7c1f-f4a1-4b39-bd80-9b8c18dfd62a
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer
> userapp user-data   7  1729598367  1729601982
>3615
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer-a97f7c1f-f4a1-4b39-bd80-9b8c18dfd62a
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer
> userapp user-data   1  1949943357  1949947252
>3895
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer-a97f7c1f-f4a1-4b39-bd80-9b8c18dfd62a
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer
> userapp user-data   15 1751889619  1751893116
>3497
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer-a97f7c1f-f4a1-4b39-bd80-9b8c18dfd62a
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer
> userapp user-data   3  1752352594  1752356255
>3661
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer-a97f7c1f-f4a1-4b39-bd80-9b8c18dfd62a
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-4-consumer
> userapp user-data   6  1693612741  1693621514
>8773
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-2-consumer-59fe9d37-cd32-4a96-867b-c13e85cba398
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-2-consumer
> userapp user-data   2  1923482321  1923490826
>8505
>
>  
> userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-2-consumer-59fe9d37-cd32-4a96-867b-c13e85cba398
> /172.21.82.140
>  userapp-aa8f851b-26b9-4dad-ad3c-333caeae388d-StreamThread-2-consumer
> userapp user-data   14 1677960791  1677969039
>8248
>
>  
> 

Re: Kafka Streams Optimizations

2020-12-14 Thread Guozhang Wang
Hello Navneeth,

Please find answers to your questions below.

On Sun, Dec 6, 2020 at 10:33 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> I have been working on moving an application to kafka streams and I have
> the following questions.
>
> 1. We were planning to use an EFS mount to share rocksdb data for KV store
> and global state store with which we were hoping to minimize the state
> restore time when new instances are brought up. But later we found that
> global state stores require a lock so directories cannot be shared. Is
> there some way around this? How is everyone minimizing the state
> restoration time?
>
>
The common suggestion is to check if during restoration, the write-buffer
is flushed too frequently so that a lot of IOs are incurred for compaction:
generally speaking you'd want to have few large sst files instead of a lot
of small sst files. And the default config may learn towards the latter
case which would slow down restoration.

We are currently working to make the following changes: use addSSTFiles API
of rocksDB to try to reduce the restoration IO cost; and consider moving
restoration to a different thread so that stream thread would not block on
IO when this happens. Stay tuned for the next release.


> 2. Topology optimization: We are using PAPI and as per the docs topology
> optimization will have no effects on low level api. Is my understanding
> correct?
>

Correct.

>
> 3. There are about 5 KV stores in our stream application and for a few the
> data size is a bit larger. Is there a config to write data to the changelog
> topic only once a minute or something? I know it will be a problem in
> maintaining the data integrity. Basically we want to reduce the amount of
> changelog data written since we will have some updates for each user every
> 5 secs or so. Any suggestions on optimizations.
>
>
Currently increasing the total cache may help (configured as
"cache.max.bytes.buffering"), this is because the caching layer is at the
same time used for suppressing updates for the same key, and hence to the
changelogs as well.


> 4. Compress data: Is there an option to compress the data being sent and
> consumed from kafka only for the intermediate topics. The major reason is
> we don't want to change the final sink because it's used by many
> applications. If we can just compress and write the data only for the
> intermediate topics and changelog that would be nice.
>
>
I think you can set compression codec at the per-topic basis on Kafka.


> Thanks and appreciate all the help.
>
> Regards,
> Navneeth
>


-- 
-- Guozhang


Re: GlobalKTable restoration - Unexplained performance penalty

2020-12-01 Thread Guozhang Wang
Hello Nitay,

Sorry for the late reply.

Would you be able to share the actual log entries from where you infers the
elapsed time and the total number of records restored?


Guozhang


On Tue, Nov 24, 2020 at 3:44 AM Nitay Kufert  wrote:

> Hey,
> I get the log *after* the restart was triggered for my app (and my app
> actually restarted, meaning i get it as part of my app bootstrap logging)
>
> On Tue, Nov 24, 2020 at 12:03 AM Guozhang Wang  wrote:
>
> > Hello Nitay,
> >
> > Thanks for letting us know about your observations. When you restart the
> > application from empty local state Kafka Streams will try to restore all
> > the records up to the current log end for those global KTables, and
> during
> > this period of time there should be no processing.
> >
> > Do you mind sharing where you get the "totalRecordsToBeRestored", is it
> > before the restarting was triggered?
> >
> > Guozhang
> >
> > On Mon, Nov 23, 2020 at 4:15 AM Nitay Kufert 
> wrote:
> >
> > > Hey all,
> > > We have been running a kafka-stream based service in production for the
> > > last couple of years (we have 4 brokers on this specific cluster).
> > > Inside this service, which does a lot of stuff - there are 2
> > GlobalKTables
> > > (which are based on 2 compacted topics).
> > >
> > > When I restart my app and clean the local state - the restoration of
> > those
> > > topics begins, and the weird thing I am noticing is that for *much
> *fewer
> > > messages, 1 topic takes *a lot* more time to complete.
> > >
> > > Let's call the compacted topics A and B, both are compacted and mainly
> > > configured the same:
> > > Replication 3
> > > Number of Partitions 36
> > >
> > > max.message.bytes 2500
> > > segment.index.bytes 2097152
> > > min.cleanable.dirty.ratio 0.1
> > > cleanup.policy compact
> > > delete.retention.ms 90
> > > segment.bytes 52428800
> > > segment.ms 360
> > > *WIth the exception, that for topic B, we use *
> > > cleanup.policy compact,delete
> > > retention.ms 60480
> > > *The behaviour i've noticed for a single partition:*
> > > Topic A has 62,552 totalRecordsToBeRestored - and it takes around 20s
> > > Topic B has 24,730,506 totalRecordsToBeRestored - and it takes around
> 1s
> > >
> > > It is worth mentioning that the data that B holds for each record *is
> > much
> > > much bigger *(A holds an integer while B holds a big object)*.*
> > >
> > > Now, I get the feeling that the reason is because the data in B is
> always
> > > relatively "fresh", while the data in topic A is mostly stale (the
> > business
> > > logic behind it suggests that topic A updates at a very low rate - and
> a
> > > lot of keys would never be updated)
> > > So, for example, it's probably holding keys that haven't been updated
> > since
> > > 2018.
> > > Topic B keeps getting updated every couple of milliseconds.
> > >
> > > Another difference that I just realized i might share is that topic A
> is
> > > being "joined" by 6 other streams, while topic B is only being joined
> by
> > 2.
> > >
> > > I find it hard to explain the relation between keeping "old" records
> and
> > > the huge difference in number of records and their size.
> > >
> > > So I guess I am missing some basic concept when it comes to
> understanding
> > > compacted topics and the way the broker saves and fetches the data OR
> > maybe
> > > we have some underlying problem which can explain it.
> > >
> > > Let me know if you need some more info
> > >
> > > Thanks!
> > >
> > > --
> > >
> > > Nitay Kufert
> > > Backend Team Leader
> > > [image: ironSource] <http://www.ironsrc.com>
> > >
> > > email nita...@ironsrc.com
> > > mobile +972-54-5480021
> > > fax +972-77-5448273
> > > skype nitay.kufert.ssa
> > > 121 Menachem Begin St., Tel Aviv, Israel
> > > ironsrc.com <http://www.ironsrc.com>
> > > [image: linkedin] <https://www.linkedin.com/company/ironsource>
> [image:
> > > twitter] <https://twitter.com/ironsource> [image: facebook]
> > > <https://www.facebook.com/ironSource> [image: googleplus]
> > > <https://plus.google.com/+ironsrc>
> > > This email (including any attachm

Re: 在哪可以查看kafka各个版本的生命周期

2020-11-28 Thread Guozhang Wang
Hello Joson,

The EOL policy of Kafka is 3 releases, which is about a year:
https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan

And you can infer the life cycle of each release from their publish date
here: https://kafka.apache.org/downloads

Please use English to engage the community next time so that more people
can help you :)


Guozhang

On Sat, Nov 28, 2020 at 11:23 AM joson.y...@ce-service.com.cn <
joson.y...@ce-service.com.cn> wrote:

> 您好:
>
>
> 在哪可以查看kafka各个版本的生命周期
>
>
>
>
>

-- 
-- Guozhang


Re: GlobalKTable restoration - Unexplained performance penalty

2020-11-23 Thread Guozhang Wang
Hello Nitay,

Thanks for letting us know about your observations. When you restart the
application from empty local state Kafka Streams will try to restore all
the records up to the current log end for those global KTables, and during
this period of time there should be no processing.

Do you mind sharing where you get the "totalRecordsToBeRestored", is it
before the restarting was triggered?

Guozhang

On Mon, Nov 23, 2020 at 4:15 AM Nitay Kufert  wrote:

> Hey all,
> We have been running a kafka-stream based service in production for the
> last couple of years (we have 4 brokers on this specific cluster).
> Inside this service, which does a lot of stuff - there are 2 GlobalKTables
> (which are based on 2 compacted topics).
>
> When I restart my app and clean the local state - the restoration of those
> topics begins, and the weird thing I am noticing is that for *much *fewer
> messages, 1 topic takes *a lot* more time to complete.
>
> Let's call the compacted topics A and B, both are compacted and mainly
> configured the same:
> Replication 3
> Number of Partitions 36
>
> max.message.bytes 2500
> segment.index.bytes 2097152
> min.cleanable.dirty.ratio 0.1
> cleanup.policy compact
> delete.retention.ms 90
> segment.bytes 52428800
> segment.ms 360
> *WIth the exception, that for topic B, we use *
> cleanup.policy compact,delete
> retention.ms 60480
> *The behaviour i've noticed for a single partition:*
> Topic A has 62,552 totalRecordsToBeRestored - and it takes around 20s
> Topic B has 24,730,506 totalRecordsToBeRestored - and it takes around 1s
>
> It is worth mentioning that the data that B holds for each record *is much
> much bigger *(A holds an integer while B holds a big object)*.*
>
> Now, I get the feeling that the reason is because the data in B is always
> relatively "fresh", while the data in topic A is mostly stale (the business
> logic behind it suggests that topic A updates at a very low rate - and a
> lot of keys would never be updated)
> So, for example, it's probably holding keys that haven't been updated since
> 2018.
> Topic B keeps getting updated every couple of milliseconds.
>
> Another difference that I just realized i might share is that topic A is
> being "joined" by 6 other streams, while topic B is only being joined by 2.
>
> I find it hard to explain the relation between keeping "old" records and
> the huge difference in number of records and their size.
>
> So I guess I am missing some basic concept when it comes to understanding
> compacted topics and the way the broker saves and fetches the data OR maybe
> we have some underlying problem which can explain it.
>
> Let me know if you need some more info
>
> Thanks!
>
> --
>
> Nitay Kufert
> Backend Team Leader
> [image: ironSource] 
>
> email nita...@ironsrc.com
> mobile +972-54-5480021
> fax +972-77-5448273
> skype nitay.kufert.ssa
> 121 Menachem Begin St., Tel Aviv, Israel
> ironsrc.com 
> [image: linkedin]  [image:
> twitter]  [image: facebook]
>  [image: googleplus]
> 
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>


-- 
-- Guozhang


Re: High latency for Kafka transactions

2020-11-13 Thread Guozhang Wang
Hello John,

It's a bit hard to reason what's your total producing traffic to the Kafka
brokers, e.g. how frequently do you send a `whole group of messages`? If it
is every 10ms e.g. then your traffic would be 100K per sec, which is pretty
large enough with transactions.


Guozhang

On Fri, Nov 13, 2020 at 1:09 AM John Coleman  wrote:

> Hi,
>
> We have very high latency ~1 second when we send a small number of small
> messages to Kafka. The whole group of messages is less than 1k. We are
> thinking better to just send the messages together as 1 message containing
> a collection of all the messages.
>
> Is there something that we can tweak to reduce this latency? We have 10
> producers and they publish to a topic with 5 consumers. Be nice if we don’t
> have to change our message structure.
>
> TIA
> John
>


-- 
-- Guozhang


Re: Partitioning per team

2020-10-30 Thread Guozhang Wang
Hello Jan,

One alternative approach you can consider is to use combo  as
the key, hence it achieves the small aggregation, while customizing your
partitioner for the repartition topic such that keys with the same 
prefix always go to the same partition. Then when cleaning up data,
similarly within the store you can do a range on prefix  and delete
all entries of  when the team is removed.

Guozhang




On Mon, Oct 26, 2020 at 1:39 PM Jan Bols  wrote:

> For a kafka-streams application, we keep data per team. Data from 2 teams
> never meet but within a team, data is highly integrated. A team has team
> members but also has several types of equipment.
> A team has a lifespan of about 1-3 days after which the team is removed and
> all data relating to that team should be evicted.
>
> How would you partition the data?
> - Using the team id as key for all streams seems not ideal b/c this means
> all aggregations need to happen per team involving a ser/deser of the
> entire team data. Suppose there's 10 team members and only 1 team member is
> sending events that need to be aggregated. In this case, we need a
> ser/deser of the entire aggregated team data. I'm afraid this would result
> in quite a bit of overhead because.
> - Using the user id or equipment id as key would result in much smaller
> aggregations but does mean quite a bit of repartitioning when aggregating
> and joining users of the same team.
>
> I ended up using the second approach, but I wonder if that was really a
> good idea b/c the entire streaming logic does become quite involved.
>
> What is your experience with this type of data?
>
> Best regards
> Jan
>


-- 
-- Guozhang


Re: Kafka Streams : Rack Aware deployment in EC2

2020-10-20 Thread Guozhang Wang
Hello Tony,

I think you already know the consumer-client side fetch-from-follower
feature:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica

Say your Kafka deployment is cross-AZs, and your Streams is also deployed
cross-AZ, you can, by setting the "client.rack" through streams configs, to
let the embedded consumers to read from close replicas within the same AZ
as well.

Guozhang


On Mon, Oct 19, 2020 at 9:19 PM Tony John  wrote:

> Thanks Matthias.. I don't think I will be able to take it up.. Will wait
> for it to be available in future. :)
>
> On Mon, Oct 19, 2020 at 11:56 PM Matthias J. Sax  wrote:
>
> > No suggestions... Sorry.
> >
> > The task assignment algorithm cannot be customized.
> >
> > The only thing you _could_ do, is to pickup the ticket yourself (to get
> > the feature maybe in 2.8 release). Not sure if you would be interested
> > to contribute :)
> >
> >
> > -Matthias
> >
> > On 10/19/20 11:08 AM, Tony John wrote:
> > > Thanks for the quick response Matthias. We were planning to move to AWS
> > > MSK, and I think we will have to defer it until rack awareness is in
> > place
> > > for Kafka Streams. Reason being consumers consuming from brokers in
> > > different availability zones would result in high data transfer
> charges.
> > Do
> > > you have any suggestions?
> > >
> > > Thanks,
> > > Tony
> > >
> > > On Mon, Oct 19, 2020 at 11:08 PM Matthias J. Sax 
> > wrote:
> > >
> > >> There is no such feature for Kafka Streams yet.
> > >>
> > >> We have one ticket for rack aware standby task placement
> > >> (https://issues.apache.org/jira/browse/KAFKA-6718) but nobody is
> > working
> > >> on it.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 10/19/20 9:20 AM, Tony John wrote:
> > >>> Hi All,
> > >>>
> > >>> I have been trying to figure out some documentation on how to enable
> > rack
> > >>> awareness on a Kafka Streams app. I do see the broker.rack
> > configuration
> > >>> which needs to be done at the broker side and client.rack
> configuration
> > >> for
> > >>> the consumers. Is there any specific configuration which is required
> to
> > >>> enable rack awareness in a streams app? Please throw some light on
> > this.
> > >>>
> > >>> PS: I am using Kafka / Kafka Streams 2.2.1
> > >>>
> > >>> Thanks,
> > >>> Tony
> > >>>
> > >>
> > >
> >
>


-- 
-- Guozhang


[ANNOUNCE] New committer: Chia-Ping Tsai

2020-10-19 Thread Guozhang Wang
Hello all,

I'm happy to announce that Chia-Ping Tsai has accepted his invitation to
become an Apache Kafka committer.

Chia-Ping has been contributing to Kafka since March 2018 and has made 74
commits:

https://github.com/apache/kafka/commits?author=chia7712

He's also authored several major improvements, participated in the KIP
discussion and PR reviews as well. His major feature development includes:

* KAFKA-9654: Epoch based ReplicaAlterLogDirsThread creation.
* KAFKA-8334: Spiky offsetCommit latency due to lock contention.
* KIP-331: Add default implementation to close() and configure() for serde
* KIP-367: Introduce close(Duration) to Producer and AdminClients
* KIP-338: Support to exclude the internal topics in kafka-topics.sh command

In addition, Chia-Ping has demonstrated his great diligence fixing test
failures, his impressive engineering attitude and taste in fixing tricky
bugs while keeping simple designs.

Please join me to congratulate Chia-Ping for all the contributions!


-- Guozhang


Re: [ANNOUNCE] New committer: David Jacot

2020-10-16 Thread Guozhang Wang
Congrats David!

Guozhang


On Fri, Oct 16, 2020 at 10:23 AM Raymond Ng  wrote:

> Congrats David!
>
> Cheers,
> Ray
>
> On Fri, Oct 16, 2020 at 10:08 AM Rajini Sivaram 
> wrote:
>
> > Congratulations, David!
> >
> > Regards,
> >
> > Rajini
> >
> > On Fri, Oct 16, 2020 at 5:45 PM Matthias J. Sax 
> wrote:
> >
> > > Congrats!
> > >
> > > On 10/16/20 9:25 AM, Tom Bentley wrote:
> > > > Congratulations David!
> > > >
> > > > On Fri, Oct 16, 2020 at 5:10 PM Bill Bejeck 
> wrote:
> > > >
> > > >> Congrats David! Well deserved.
> > > >>
> > > >> -Bill
> > > >>
> > > >> On Fri, Oct 16, 2020 at 12:01 PM Gwen Shapira 
> > > wrote:
> > > >>
> > > >>> The PMC for Apache Kafka has invited David Jacot as a committer,
> and
> > > >>> we are excited to say that he accepted!
> > > >>>
> > > >>> David Jacot has been contributing to Apache Kafka since July 2015
> (!)
> > > >>> and has been very active since August 2019. He contributed several
> > > >>> notable KIPs:
> > > >>>
> > > >>> KIP-511: Collect and Expose Client Name and Version in Brokers
> > > >>> KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies:
> > > >>> KIP-570: Add leader epoch in StopReplicaReques
> > > >>> KIP-599: Throttle Create Topic, Create Partition and Delete Topic
> > > >>> Operations
> > > >>> KIP-496 Added an API for the deletion of consumer offsets
> > > >>>
> > > >>> In addition, David Jacot reviewed many community contributions and
> > > >>> showed great technical and architectural taste. Great reviews are
> > hard
> > > >>> and often thankless work - but this is what makes Kafka a great
> > > >>> product and helps us grow our community.
> > > >>>
> > > >>> Thanks for all the contributions, David! Looking forward to more
> > > >>> collaboration in the Apache Kafka community.
> > > >>>
> > > >>> --
> > > >>> Gwen Shapira
> > > >>>
> > > >>
> > > >
> > >
> >
>


-- 
-- Guozhang


Re: Contributor list registration request

2020-10-11 Thread Guozhang Wang
Hello Sheikh,

Your account username has been added to the contributor list. Cheers.

Guozhang

On Sun, Oct 11, 2020 at 11:22 AM Sheikh Araf  wrote:

> Hi,
>
> Please add me to the contributor list. My JIRA account username is
> “arafsheikh”.
>
> Thanks in advance!
>
>

-- 
-- Guozhang


Re: Is this a valid use case for reading local store ?

2020-10-01 Thread Guozhang Wang
Mohan,

I think you can build a REST API on top of app1 directly leveraging on its
IQ interface. For some examples code you can refer to
https://github.com/confluentinc/kafka-streams-examples/tree/6.0.0-post/src/main/java/io/confluent/examples/streams/interactivequeries

Guozhang

On Thu, Oct 1, 2020 at 10:40 AM Parthasarathy, Mohan 
wrote:

> Hi Guozhang,
>
> The async event trigger process is not running as a kafka streams
> application. It offers REST interface where other applications post events
> which in turn needs to go through App1's state and send requests to App2
> via Kafka. Here is the diagram:
>
>KafkaTopics--->  App1 ---> App2
>|
>V
> REST >App3
>
> REST API to App3 and read the local store of App1 (IQ) and send requests
> to App2 (through kafka topic, not shown above).  Conceptually it looks same
> as your use case. What do people do if a kafka streams application (App1)
> has to offer REST interface also ?
>
> -thanks
> Mohan
>
> On 9/30/20, 5:01 PM, "Guozhang Wang"  wrote:
>
> Hello Mohan,
>
> If I understand correctly, your async event trigger process runs out
> of the
> streams application, that reads the state stores of app2 through the
> interactive query interface, right? This is actually a pretty common
> use
> case pattern for IQ :)
>
>
> Guozhang
>
> On Wed, Sep 30, 2020 at 1:22 PM Parthasarathy, Mohan  >
> wrote:
>
> > Hi,
> >
> > A traditional kafka streams application (App1)  reading data from a
> kafka
> > topic, doing aggregations resulting in some local state. The output
> of this
> > application is consumed by a different application(App2) for doing a
> > different task. Under some conditions, there is an external trigger
> (async
> > event) which needs to trigger requests for all the keys in the local
> store
> > to App2. To achieve this, we can read the local stores from all the
> > replicas and send the request to App2.
> >
> > This async event happens less frequently compared to the normal case
> that
> > leads to the state creation in the first place. Are there any
> caveats doing
> > it this way ? If not, any other suggestions ?
> >
> > Thanks
> > Mohan
> >
> >
>
> --
> -- Guozhang
>
>
>

-- 
-- Guozhang


Re: Sign up email for apache kafka notifications

2020-09-30 Thread Guozhang Wang
Hello Josh,

Please help yourself subscribing to the mailing list of Kafka:
https://kafka.apache.org/contact

On Wed, Sep 30, 2020 at 9:36 AM 
wrote:

> Hello,
>
> This is an email to sign up for the notifications for apache kafka.
> Please advise if there are any further actions that I will need to take.
>
> Many thanks
> Josh
> Joshua Racey | Middleware Security & Control | Apprentice | Infrastructure
> Services
> Phone : 03301529774
> Email: joshua.ra...@barclayscorp.com
> Ground Floor | Turing House | BTC Radbroke | Knutsford | WA16 9EU
> Respect  |  Integrity  |  Service  |  Excellence  |  Stewardship
> Helping people achieve their ambitions - in the right way
>
> P Please consider the environment before printing this email
>
>
>
> Restricted - External
>
> This e-mail and any attachments are confidential and intended solely for
> the addressee and may also be privileged or exempt from disclosure under
> applicable law. If you are not the addressee, or have received this e-mail
> in error, please notify the sender immediately, delete it from your system
> and do not copy, disclose or otherwise act upon any part of this e-mail or
> its attachments.
>
> Internet communications are not guaranteed to be secure or virus-free. The
> Barclays Group does not accept responsibility for any loss arising from
> unauthorised access to, or interference with, any Internet communications
> by any third party, or from the transmission of any viruses. Replies to
> this e-mail may be monitored by the Barclays Group for operational or
> business reasons.
>
> Any opinion or other information in this e-mail or its attachments that
> does not relate to the business of the Barclays Group is personal to the
> sender and is not given or endorsed by the Barclays Group.
>
> Barclays Bank PLC. Registered in England and Wales (registered no.
> 1026167). Registered Office: 1 Churchill Place, London, E14 5HP, United
> Kingdom.
>
> Barclays Bank PLC is authorised by the Prudential Regulation Authority and
> regulated by the Financial Conduct Authority and the Prudential Regulation
> Authority (Financial Services Register No. 122702).
>


-- 
-- Guozhang


Re: Is this a valid use case for reading local store ?

2020-09-30 Thread Guozhang Wang
Hello Mohan,

If I understand correctly, your async event trigger process runs out of the
streams application, that reads the state stores of app2 through the
interactive query interface, right? This is actually a pretty common use
case pattern for IQ :)


Guozhang

On Wed, Sep 30, 2020 at 1:22 PM Parthasarathy, Mohan 
wrote:

> Hi,
>
> A traditional kafka streams application (App1)  reading data from a kafka
> topic, doing aggregations resulting in some local state. The output of this
> application is consumed by a different application(App2) for doing a
> different task. Under some conditions, there is an external trigger (async
> event) which needs to trigger requests for all the keys in the local store
> to App2. To achieve this, we can read the local stores from all the
> replicas and send the request to App2.
>
> This async event happens less frequently compared to the normal case that
> leads to the state creation in the first place. Are there any caveats doing
> it this way ? If not, any other suggestions ?
>
> Thanks
> Mohan
>
>

-- 
-- Guozhang


Re: Newly added topic or partitions are not assigned to running consumer groups using static membership

2020-09-27 Thread Guozhang Wang
That seems to be a bug indeed, I will reply on the ticket.


Guozhang

On Thu, Sep 24, 2020 at 8:03 PM Fu, Tony  wrote:

> Is anyone seeing this problem as well (
> https://issues.apache.org/jira/browse/KAFKA-10513)?  I think it also
> happens when new topics created within the subscription pattern.
>
> Tony
>


-- 
-- Guozhang


  1   2   3   4   5   6   7   8   9   10   >