RE: [QUESTION] What to do about conflicted KIP numbers?

2024-06-17 Thread Welch, Matt
Thanks for the input Matthias,

I guess we will keep things as they are to prevent yet another layer added on 
top.
Eric has already taken the next available number and it's recorded on the wiki 
so I suppose there's no additional work required here.

Regards,
-Matt

-Original Message-
From: Matthias J. Sax  
Sent: Friday, June 14, 2024 3:35 PM
To: dev@kafka.apache.org
Subject: Re: [QUESTION] What to do about conflicted KIP numbers?

I don't think that there is an official guideline.

Personally, I would suggest that the corresponding KIP owners agree who is 
keeping the conflicting number, and how is changing it.

For the ones changing the number, I would propose to restart a new DISCUSS 
thread using the new number to separate the KIP threads.

Not sure if the is a better way to handle this... Just an idea on how I would 
do it.


Not sure if we can improve the wiki instruction to make the race 
condition less likely? Seems, this would happen if two people look at 
the next KIP number let's say X, but don't bump it right way to X+1 and 
publish their KIP with X a few hours/days later without verifying that X 
is still next available KIP number?


-Matthias


On 6/14/24 3:10 PM, Welch, Matt wrote:
> Hi Kafka devs,
> 
> I submitted a KIP last week and encountered a KIP-process race condition 
> where my KIP number was consumed by another dev without updating the wiki 
> page containing KIPs: Kafka Improvement Proposals - Apache Kafka - Apache 
> Software 
> Foundation<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals>
> 
> There are now least three separate dev-list threads referencing this 
> conflicted KIP number so I'm concerned that discussion around this number 
> will now be permanently confusing due to the conflict and multiple concurrent 
> unrelated threads referencing the same KIP number.  I've intentionally kept 
> the KIP numbers out of this email to prevent yet another thread referencing 
> them.
> 
> While I'm happy to keep going with my existing KIP number, I was wondering if 
> I should "abandon" it and create a new one.
> This solution seems like it could create extra confusion, however, so what is 
> the best course of action here?
> 
> Thanks,
> Matt
> 


Re: [QUESTION] What to do about conflicted KIP numbers?

2024-06-14 Thread Matthias J. Sax

I don't think that there is an official guideline.

Personally, I would suggest that the corresponding KIP owners agree who 
is keeping the conflicting number, and how is changing it.


For the ones changing the number, I would propose to restart a new 
DISCUSS thread using the new number to separate the KIP threads.


Not sure if the is a better way to handle this... Just an idea on how I 
would do it.



Not sure if we can improve the wiki instruction to make the race 
condition less likely? Seems, this would happen if two people look at 
the next KIP number let's say X, but don't bump it right way to X+1 and 
publish their KIP with X a few hours/days later without verifying that X 
is still next available KIP number?



-Matthias


On 6/14/24 3:10 PM, Welch, Matt wrote:

Hi Kafka devs,

I submitted a KIP last week and encountered a KIP-process race condition where my KIP 
number was consumed by another dev without updating the wiki page containing KIPs: 
Kafka Improvement Proposals - Apache Kafka - Apache Software 
Foundation

There are now least three separate dev-list threads referencing this conflicted 
KIP number so I'm concerned that discussion around this number will now be 
permanently confusing due to the conflict and multiple concurrent unrelated 
threads referencing the same KIP number.  I've intentionally kept the KIP 
numbers out of this email to prevent yet another thread referencing them.

While I'm happy to keep going with my existing KIP number, I was wondering if I should 
"abandon" it and create a new one.
This solution seems like it could create extra confusion, however, so what is 
the best course of action here?

Thanks,
Matt



[QUESTION] What to do about conflicted KIP numbers?

2024-06-14 Thread Welch, Matt
Hi Kafka devs,

I submitted a KIP last week and encountered a KIP-process race condition where 
my KIP number was consumed by another dev without updating the wiki page 
containing KIPs: Kafka Improvement Proposals - Apache Kafka - Apache Software 
Foundation

There are now least three separate dev-list threads referencing this conflicted 
KIP number so I'm concerned that discussion around this number will now be 
permanently confusing due to the conflict and multiple concurrent unrelated 
threads referencing the same KIP number.  I've intentionally kept the KIP 
numbers out of this email to prevent yet another thread referencing them.

While I'm happy to keep going with my existing KIP number, I was wondering if I 
should "abandon" it and create a new one.
This solution seems like it could create extra confusion, however, so what is 
the best course of action here?

Thanks,
Matt


[QUESTION] Move log start offset back in time for data recovery

2024-05-26 Thread bo gao
Hi,

I have one question for tier storage, have a 3.6 cluster with some log
segments exists in the remote store with metadata, but kafka's earliest
offset is more recent, is there a way to move start log offset back in
time? I wonder if we can still serve that data to consumer. If yes, would
appreciate a few tips regarding how to do so and pointer to internal
script/tooling.

thanks!


[jira] [Created] (KAFKA-16648) Question: KIP-848 and KafkaTestKit.java

2024-04-30 Thread sanghyeok An (Jira)
sanghyeok An created KAFKA-16648:


 Summary: Question: KIP-848 and KafkaTestKit.java
 Key: KAFKA-16648
 URL: https://issues.apache.org/jira/browse/KAFKA-16648
 Project: Kafka
  Issue Type: Bug
Reporter: sanghyeok An
 Attachments: image-2024-04-30-19-19-12-316.png, 
image-2024-04-30-19-20-14-427.png

Hi, Kafka Team.
I am writing test code for the new rebalancing protocol proposed in KIP-848.

It works well in general code. However, it does not work properly when creating 
an EmbeddedBroker using KafkaTestKit.java.

 
 
 
### Phenomena
 # Create a CombineBroker that acts as both controller and broker using 
KafkaTestKit.
 # Consumer do subscribe() and poll() to created Broker. 
 

At this time, the Consumer sends a HeartBeat Signal to the Broker successfully. 
However, it never receives a Partition Assigned response from the Broker.
 
### What is my broker configs? 
!image-2024-04-30-19-19-12-316.png|width=530,height=228!
 
### Actual Broker Config.
!image-2024-04-30-19-20-14-427.png|width=465,height=151!
I set controller.quorum.voters = 0@localhost:9093, but 0@0.0.0.0.0:0 is setted. 
Because of this codes 
([https://github.com/apache/kafka/blob/7c0a302c4da9d53a8fddc504a9fac8d8afecbec8/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L305-L307)]
 
 
 
### My opinion.
I am not familiar with the broker's quorum, but it seems to be the problem.
 
I expect that when the Consumer sends a poll request to the broker, the group 
coordinator broker assigns the topic/partition and then performs quorum for 
each epoch number.
 
However, it seems to not work because the controller to vote is represented as 
0.0.0.0:0.
 
This setting does not work well when applied to containers in docker-compose.
Could this be the cause of the problem?
 
 
### Question
If {{controller.quorum.voters}} is set to {{0.0.0.0:0}} and i want to use 
consumer group rebalancing through KIP-848, what settings should be applied to 
the brokers and consumers?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Question] About Kafka producer design decision making

2024-01-02 Thread Ismael Juma
I should also clarify that Chia-Ping  took it over from me originally and
improved the proposal significantly. I think he also got busy and hasn't
been able to spend time on it for a while though.

Ismael

On Tue, Jan 2, 2024 at 4:08 PM Ismael Juma  wrote:

> I had written a KIP a while back:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=100829459
>
> Happy for someone to pick it up and drive it forward.
>
> Ismael
>
> On Tue, Jan 2, 2024 at 2:00 PM Justine Olshan 
> wrote:
>
>> Hey folks --
>>  I think this is a good conversation. Given we plan to drop support for
>> Java 8 in 4.0 this seems like a good time to consider this change. Perhaps
>> we should file a JIRA ticket and maybe create a KIP to discuss?
>>
>> One thing we should consider however, is if we want some of these
>> operations to be asynchronous. I know I got into some tricky areas of
>> Kafka
>> recently that assumed operations completed in a given order so it is good
>> to confirm such changes are safe.
>>
>> Looking forward to further discussion,
>> Justine
>>
>> On Tue, Jan 2, 2024 at 1:14 PM Philip Nee  wrote:
>>
>> > hey sean - a lot of uses of the Futures are in the public API and
>> therefore
>> > take voting/effort to be changed.  i don't know any reason for
>> > intentionally avoiding the use of CompletableFuture, however, others
>> might
>> > know more than I do.
>> >
>> > thanks,
>> > P
>> >
>> > On Tue, Nov 14, 2023 at 1:27 AM 신수웅(Sean Sin) 
>> > wrote:
>> >
>> > > Dear Apache Kakfa Developers,
>> > >
>> > > I'm 4-year SWE in South Korea.
>> > > I have some questions while watching Kafka Producer API.
>> > >
>> > > *Why Use "Future" and Not "CompletableFuture"?*
>> > >
>> > > In the case of "Future", blocking occurs when calling "*get()*", so I
>> > > thought "Computable Future" would be better when doing more
>> asynchronous
>> > > operations.
>> > >
>> > > I looked at the Java API document
>> > > <
>> > >
>> >
>> https://kafka.apache.org/36/javadoc/org/apache/kafka/common/KafkaFuture.html#thenApply(org.apache.kafka.common.KafkaFuture.BaseFunction)
>> > > >
>> > > based on the latest version, version 3.6.x.
>> > >
>> > > If you look at that version, you can see that the Future object
>> provides
>> > > the "toCompletionStage() "method, which can convert "KafkaFuture" to
>> > > "ComputableFuture".
>> > >
>> > > In response to this, I think that in the initial design decision
>> process,
>> > > we considered compatibility issues under JDK 1.8 and the level of
>> > knowledge
>> > > of the learning curve or developer when introducing ComputableFuture,
>> > but I
>> > > wonder if this is correct.
>> > >
>> > > In addition, I wonder if it is recommended to use the
>> > "toCompletionStage()"
>> > > method to produce more non-blocking if we assume JDK 1.8 or higher.
>> > >
>> > > Thanks.
>> > > Su-Ung Shin.
>> > >
>> >
>>
>


Re: [Question] About Kafka producer design decision making

2024-01-02 Thread Ismael Juma
I had written a KIP a while back:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=100829459

Happy for someone to pick it up and drive it forward.

Ismael

On Tue, Jan 2, 2024 at 2:00 PM Justine Olshan 
wrote:

> Hey folks --
>  I think this is a good conversation. Given we plan to drop support for
> Java 8 in 4.0 this seems like a good time to consider this change. Perhaps
> we should file a JIRA ticket and maybe create a KIP to discuss?
>
> One thing we should consider however, is if we want some of these
> operations to be asynchronous. I know I got into some tricky areas of Kafka
> recently that assumed operations completed in a given order so it is good
> to confirm such changes are safe.
>
> Looking forward to further discussion,
> Justine
>
> On Tue, Jan 2, 2024 at 1:14 PM Philip Nee  wrote:
>
> > hey sean - a lot of uses of the Futures are in the public API and
> therefore
> > take voting/effort to be changed.  i don't know any reason for
> > intentionally avoiding the use of CompletableFuture, however, others
> might
> > know more than I do.
> >
> > thanks,
> > P
> >
> > On Tue, Nov 14, 2023 at 1:27 AM 신수웅(Sean Sin) 
> > wrote:
> >
> > > Dear Apache Kakfa Developers,
> > >
> > > I'm 4-year SWE in South Korea.
> > > I have some questions while watching Kafka Producer API.
> > >
> > > *Why Use "Future" and Not "CompletableFuture"?*
> > >
> > > In the case of "Future", blocking occurs when calling "*get()*", so I
> > > thought "Computable Future" would be better when doing more
> asynchronous
> > > operations.
> > >
> > > I looked at the Java API document
> > > <
> > >
> >
> https://kafka.apache.org/36/javadoc/org/apache/kafka/common/KafkaFuture.html#thenApply(org.apache.kafka.common.KafkaFuture.BaseFunction)
> > > >
> > > based on the latest version, version 3.6.x.
> > >
> > > If you look at that version, you can see that the Future object
> provides
> > > the "toCompletionStage() "method, which can convert "KafkaFuture" to
> > > "ComputableFuture".
> > >
> > > In response to this, I think that in the initial design decision
> process,
> > > we considered compatibility issues under JDK 1.8 and the level of
> > knowledge
> > > of the learning curve or developer when introducing ComputableFuture,
> > but I
> > > wonder if this is correct.
> > >
> > > In addition, I wonder if it is recommended to use the
> > "toCompletionStage()"
> > > method to produce more non-blocking if we assume JDK 1.8 or higher.
> > >
> > > Thanks.
> > > Su-Ung Shin.
> > >
> >
>


Re: [Question] About Kafka producer design decision making

2024-01-02 Thread Justine Olshan
Hey folks --
 I think this is a good conversation. Given we plan to drop support for
Java 8 in 4.0 this seems like a good time to consider this change. Perhaps
we should file a JIRA ticket and maybe create a KIP to discuss?

One thing we should consider however, is if we want some of these
operations to be asynchronous. I know I got into some tricky areas of Kafka
recently that assumed operations completed in a given order so it is good
to confirm such changes are safe.

Looking forward to further discussion,
Justine

On Tue, Jan 2, 2024 at 1:14 PM Philip Nee  wrote:

> hey sean - a lot of uses of the Futures are in the public API and therefore
> take voting/effort to be changed.  i don't know any reason for
> intentionally avoiding the use of CompletableFuture, however, others might
> know more than I do.
>
> thanks,
> P
>
> On Tue, Nov 14, 2023 at 1:27 AM 신수웅(Sean Sin) 
> wrote:
>
> > Dear Apache Kakfa Developers,
> >
> > I'm 4-year SWE in South Korea.
> > I have some questions while watching Kafka Producer API.
> >
> > *Why Use "Future" and Not "CompletableFuture"?*
> >
> > In the case of "Future", blocking occurs when calling "*get()*", so I
> > thought "Computable Future" would be better when doing more asynchronous
> > operations.
> >
> > I looked at the Java API document
> > <
> >
> https://kafka.apache.org/36/javadoc/org/apache/kafka/common/KafkaFuture.html#thenApply(org.apache.kafka.common.KafkaFuture.BaseFunction)
> > >
> > based on the latest version, version 3.6.x.
> >
> > If you look at that version, you can see that the Future object provides
> > the "toCompletionStage() "method, which can convert "KafkaFuture" to
> > "ComputableFuture".
> >
> > In response to this, I think that in the initial design decision process,
> > we considered compatibility issues under JDK 1.8 and the level of
> knowledge
> > of the learning curve or developer when introducing ComputableFuture,
> but I
> > wonder if this is correct.
> >
> > In addition, I wonder if it is recommended to use the
> "toCompletionStage()"
> > method to produce more non-blocking if we assume JDK 1.8 or higher.
> >
> > Thanks.
> > Su-Ung Shin.
> >
>


Re: [Question] About Kafka producer design decision making

2024-01-02 Thread Philip Nee
hey sean - a lot of uses of the Futures are in the public API and therefore
take voting/effort to be changed.  i don't know any reason for
intentionally avoiding the use of CompletableFuture, however, others might
know more than I do.

thanks,
P

On Tue, Nov 14, 2023 at 1:27 AM 신수웅(Sean Sin)  wrote:

> Dear Apache Kakfa Developers,
>
> I'm 4-year SWE in South Korea.
> I have some questions while watching Kafka Producer API.
>
> *Why Use "Future" and Not "CompletableFuture"?*
>
> In the case of "Future", blocking occurs when calling "*get()*", so I
> thought "Computable Future" would be better when doing more asynchronous
> operations.
>
> I looked at the Java API document
> <
> https://kafka.apache.org/36/javadoc/org/apache/kafka/common/KafkaFuture.html#thenApply(org.apache.kafka.common.KafkaFuture.BaseFunction)
> >
> based on the latest version, version 3.6.x.
>
> If you look at that version, you can see that the Future object provides
> the "toCompletionStage() "method, which can convert "KafkaFuture" to
> "ComputableFuture".
>
> In response to this, I think that in the initial design decision process,
> we considered compatibility issues under JDK 1.8 and the level of knowledge
> of the learning curve or developer when introducing ComputableFuture, but I
> wonder if this is correct.
>
> In addition, I wonder if it is recommended to use the "toCompletionStage()"
> method to produce more non-blocking if we assume JDK 1.8 or higher.
>
> Thanks.
> Su-Ung Shin.
>


[Question] About Kafka producer design decision making

2023-11-14 Thread Sean Sin
Dear Apache Kakfa Developers,

I'm 4-year SWE in South Korea.
I have some questions while watching Kafka Producer API.

*Why Use "Future" and Not "CompletableFuture"?*

In the case of "Future", blocking occurs when calling "*get()*", so I
thought "Computable Future" would be better when doing more asynchronous
operations.

I looked at the Java API document

based on the latest version, version 3.6.x.

If you look at that version, you can see that the Future object provides
the "toCompletionStage() "method, which can convert "KafkaFuture" to
"ComputableFuture".

In response to this, I think that in the initial design decision process,
we considered compatibility issues under JDK 1.8 and the level of knowledge
of the learning curve or developer when introducing ComputableFuture, but I
wonder if this is correct.

In addition, I wonder if it is recommended to use the "toCompletionStage()"
method to produce more non-blocking if we assume JDK 1.8 or higher.

Thanks.
Su-Ung Shin.


Re: question

2023-09-22 Thread Luke Chen
Hi 殿杰

In short, we don't support it now.
But welcome to submit a PR to fix the gap.
You can check this ticket for more information:
https://issues.apache.org/jira/browse/KAFKA-7025

Thanks.
Luke

On Sat, Sep 23, 2023 at 2:14 AM shidian...@mxnavi.com 
wrote:

>
> hello,
>
> I‘m working inKafika development. Now,I have a question. Does Kafka
> support Android client?
>
>
>
>
> 石殿杰
> 技术中心
> 邮箱:shidian...@mxnavi.com
> 电话:18341724011
>


Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-09 Thread tison
Thanks for your reply!

I may not use "normalization". What I want to refer to is:

appendInfo.setLastOffset(offset.value - 1)

which underneath updates the base offset field (in record batch) but not
the offset delta of each record.

Best,
tison.


Justine Olshan  于2023年8月8日周二 00:43写道:

> The sequence summary looks right to me.
> For log normalization, are you referring to compaction? The segment's first
> and last offsets might change, but a batch keeps its offsets when
> compaction occurs.
>
> Hope that helps.
> Justine
>
> On Mon, Aug 7, 2023 at 8:59 AM Matthias J. Sax  wrote:
>
> > > but the base offset may change during log normalizing.
> >
> > Not sure what you mean by "normalization" but offsets are immutable, so
> > they don't change. (To be fair, I am not an expert on brokers, so not
> > sure how this work in detail when log compaction ticks in).
> >
> > > This field is given by the producer and the broker should only read it.
> >
> > Sounds right. The point being is, that the broker has an "expected"
> > value for it, and if the provided value does not match the expected one,
> > the write is rejected to begin with.
> >
> >
> > -Matthias
> >
> > On 8/7/23 6:35 AM, tison wrote:
> > > Hi Matthias and Justine,
> > >
> > > Thanks for your reply!
> > >
> > > I can summarize the answer as -
> > >
> > > Record offset = base offset + offset delta. This field is calculated by
> > the
> > > broker and the delta won't change but the base offset may change during
> > log
> > > normalizing.
> > > Record sequence = base sequence + (offset) delta. This field is given
> by
> > > the producer and the broker should only read it.
> > >
> > > Is it correct?
> > >
> > > I implement the manipulation part of base offset following this
> > > understanding at [1].
> > >
> > > Best,
> > > tison.
> > >
> > > [1]
> > >
> >
> https://github.com/tisonkun/kafka-api/blob/d080ab7e4b57c0ab0182e0b254333f400e616cd2/simplesrv/src/lib.rs#L391-L394
> > >
> > >
> > > Justine Olshan  于2023年8月2日周三 04:19写道:
> > >
> > >> For what it's worth -- the sequence number is not calculated
> > >> "baseOffset/baseSequence + offset delta" but rather by monotonically
> > >> increasing for a given epoch. If the epoch is bumped, we reset back to
> > >> zero.
> > >> This may mean that the offset and sequence may match, but do not
> > strictly
> > >> need to be the same. The sequence number will also always come from
> the
> > >> client and be in the produce records sent to the Kafka broker.
> > >>
> > >> As for offsets, there is some code in the log layer that maintains the
> > log
> > >> end offset and assigns offsets to the records. The produce handling on
> > the
> > >> leader should typically assign the offset.
> > >> I believe you can find that code here:
> > >>
> > >>
> >
> https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/core/src/main/scala/kafka/log/UnifiedLog.scala#L766
> > >>
> > >> Justine
> > >>
> > >> On Tue, Aug 1, 2023 at 11:38 AM Matthias J. Sax 
> > wrote:
> > >>
> > >>> The _offset_ is the position of the record in the partition.
> > >>>
> > >>> The _sequence number_ is a unique ID that allows broker to
> de-duplicate
> > >>> messages. It requires the producer to implement the idempotency
> > protocol
> > >>> (part of Kafka transactions); thus, sequence numbers are optional and
> > as
> > >>> long as you don't want to support idempotent writes, you don't need
> to
> > >>> worry about them. (If you want to dig into details, checkout KIP-98
> > that
> > >>> is the original KIP about Kafka TX).
> > >>>
> > >>> HTH,
> > >>> -Matthias
> > >>>
> > >>> On 8/1/23 2:19 AM, tison wrote:
> > >>>> Hi,
> > >>>>
> > >>>> I'm wringing a Kafka API Rust codec library[1] to understand how
> Kafka
> > >>>> models its concepts and how the core business logic works.
> > >>>>
> > >>>> During implementing the codec for Records[2], I saw a twins of
> fields
> > >>>> "sequence" and "offset". Both of them are calculated by
> > >>>> baseOffset/baseSequence + offset delta. Then I'm a bit confused how
> to
> > >>> deal
> > >>>> with them properly - what's the difference between these two
> concepts
> > >>>> logically?
> > >>>>
> > >>>> Also, to understand how the core business logic works, I write a
> > simple
> > >>>> server based on my codec library, and observe that the server may
> need
> > >> to
> > >>>> update offset for records produced. How does Kafka set the correct
> > >> offset
> > >>>> for each produced records? And how does Kafka maintain the
> calculation
> > >>> for
> > >>>> offset and sequence during these modifications?
> > >>>>
> > >>>> I'll appreciate if anyone can answer the question or give some
> > insights
> > >>> :D
> > >>>>
> > >>>> Best,
> > >>>> tison.
> > >>>>
> > >>>> [1] https://github.com/tisonkun/kafka-api
> > >>>> [2] https://kafka.apache.org/documentation/#messageformat
> > >>>>
> > >>>
> > >>
> > >
> >
>


Re: [QUESTION] about topic dimension quota

2023-08-08 Thread hudeqi
In fact, I have implemented the bytesIn/bytesOut limit of the topic dimension. 
I don't know the community's attitude towards this feature, so I don't know if 
I need to propose a KIP to contribute.

best,
hudeqi


 -原始邮件-
 发件人: hudeqi 16120...@bjtu.edu.cn
 发送时间: 2023-08-08 21:10:39 (星期二)
 收件人: dev@kafka.apache.org
 抄送: 
 主题: [QUESTION] about topic dimension quota
 


[QUESTION] about topic dimension quota

2023-08-08 Thread hudeqi
Hi,all. Let me ask a question first, that is, do we plan to support quota in 
the topic dimension?

Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-07 Thread Justine Olshan
The sequence summary looks right to me.
For log normalization, are you referring to compaction? The segment's first
and last offsets might change, but a batch keeps its offsets when
compaction occurs.

Hope that helps.
Justine

On Mon, Aug 7, 2023 at 8:59 AM Matthias J. Sax  wrote:

> > but the base offset may change during log normalizing.
>
> Not sure what you mean by "normalization" but offsets are immutable, so
> they don't change. (To be fair, I am not an expert on brokers, so not
> sure how this work in detail when log compaction ticks in).
>
> > This field is given by the producer and the broker should only read it.
>
> Sounds right. The point being is, that the broker has an "expected"
> value for it, and if the provided value does not match the expected one,
> the write is rejected to begin with.
>
>
> -Matthias
>
> On 8/7/23 6:35 AM, tison wrote:
> > Hi Matthias and Justine,
> >
> > Thanks for your reply!
> >
> > I can summarize the answer as -
> >
> > Record offset = base offset + offset delta. This field is calculated by
> the
> > broker and the delta won't change but the base offset may change during
> log
> > normalizing.
> > Record sequence = base sequence + (offset) delta. This field is given by
> > the producer and the broker should only read it.
> >
> > Is it correct?
> >
> > I implement the manipulation part of base offset following this
> > understanding at [1].
> >
> > Best,
> > tison.
> >
> > [1]
> >
> https://github.com/tisonkun/kafka-api/blob/d080ab7e4b57c0ab0182e0b254333f400e616cd2/simplesrv/src/lib.rs#L391-L394
> >
> >
> > Justine Olshan  于2023年8月2日周三 04:19写道:
> >
> >> For what it's worth -- the sequence number is not calculated
> >> "baseOffset/baseSequence + offset delta" but rather by monotonically
> >> increasing for a given epoch. If the epoch is bumped, we reset back to
> >> zero.
> >> This may mean that the offset and sequence may match, but do not
> strictly
> >> need to be the same. The sequence number will also always come from the
> >> client and be in the produce records sent to the Kafka broker.
> >>
> >> As for offsets, there is some code in the log layer that maintains the
> log
> >> end offset and assigns offsets to the records. The produce handling on
> the
> >> leader should typically assign the offset.
> >> I believe you can find that code here:
> >>
> >>
> https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/core/src/main/scala/kafka/log/UnifiedLog.scala#L766
> >>
> >> Justine
> >>
> >> On Tue, Aug 1, 2023 at 11:38 AM Matthias J. Sax 
> wrote:
> >>
> >>> The _offset_ is the position of the record in the partition.
> >>>
> >>> The _sequence number_ is a unique ID that allows broker to de-duplicate
> >>> messages. It requires the producer to implement the idempotency
> protocol
> >>> (part of Kafka transactions); thus, sequence numbers are optional and
> as
> >>> long as you don't want to support idempotent writes, you don't need to
> >>> worry about them. (If you want to dig into details, checkout KIP-98
> that
> >>> is the original KIP about Kafka TX).
> >>>
> >>> HTH,
> >>> -Matthias
> >>>
> >>> On 8/1/23 2:19 AM, tison wrote:
> >>>> Hi,
> >>>>
> >>>> I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
> >>>> models its concepts and how the core business logic works.
> >>>>
> >>>> During implementing the codec for Records[2], I saw a twins of fields
> >>>> "sequence" and "offset". Both of them are calculated by
> >>>> baseOffset/baseSequence + offset delta. Then I'm a bit confused how to
> >>> deal
> >>>> with them properly - what's the difference between these two concepts
> >>>> logically?
> >>>>
> >>>> Also, to understand how the core business logic works, I write a
> simple
> >>>> server based on my codec library, and observe that the server may need
> >> to
> >>>> update offset for records produced. How does Kafka set the correct
> >> offset
> >>>> for each produced records? And how does Kafka maintain the calculation
> >>> for
> >>>> offset and sequence during these modifications?
> >>>>
> >>>> I'll appreciate if anyone can answer the question or give some
> insights
> >>> :D
> >>>>
> >>>> Best,
> >>>> tison.
> >>>>
> >>>> [1] https://github.com/tisonkun/kafka-api
> >>>> [2] https://kafka.apache.org/documentation/#messageformat
> >>>>
> >>>
> >>
> >
>


Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-07 Thread Matthias J. Sax

but the base offset may change during log normalizing.


Not sure what you mean by "normalization" but offsets are immutable, so 
they don't change. (To be fair, I am not an expert on brokers, so not 
sure how this work in detail when log compaction ticks in).



This field is given by the producer and the broker should only read it.


Sounds right. The point being is, that the broker has an "expected" 
value for it, and if the provided value does not match the expected one, 
the write is rejected to begin with.



-Matthias

On 8/7/23 6:35 AM, tison wrote:

Hi Matthias and Justine,

Thanks for your reply!

I can summarize the answer as -

Record offset = base offset + offset delta. This field is calculated by the
broker and the delta won't change but the base offset may change during log
normalizing.
Record sequence = base sequence + (offset) delta. This field is given by
the producer and the broker should only read it.

Is it correct?

I implement the manipulation part of base offset following this
understanding at [1].

Best,
tison.

[1]
https://github.com/tisonkun/kafka-api/blob/d080ab7e4b57c0ab0182e0b254333f400e616cd2/simplesrv/src/lib.rs#L391-L394


Justine Olshan  于2023年8月2日周三 04:19写道:


For what it's worth -- the sequence number is not calculated
"baseOffset/baseSequence + offset delta" but rather by monotonically
increasing for a given epoch. If the epoch is bumped, we reset back to
zero.
This may mean that the offset and sequence may match, but do not strictly
need to be the same. The sequence number will also always come from the
client and be in the produce records sent to the Kafka broker.

As for offsets, there is some code in the log layer that maintains the log
end offset and assigns offsets to the records. The produce handling on the
leader should typically assign the offset.
I believe you can find that code here:

https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/core/src/main/scala/kafka/log/UnifiedLog.scala#L766

Justine

On Tue, Aug 1, 2023 at 11:38 AM Matthias J. Sax  wrote:


The _offset_ is the position of the record in the partition.

The _sequence number_ is a unique ID that allows broker to de-duplicate
messages. It requires the producer to implement the idempotency protocol
(part of Kafka transactions); thus, sequence numbers are optional and as
long as you don't want to support idempotent writes, you don't need to
worry about them. (If you want to dig into details, checkout KIP-98 that
is the original KIP about Kafka TX).

HTH,
-Matthias

On 8/1/23 2:19 AM, tison wrote:

Hi,

I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
models its concepts and how the core business logic works.

During implementing the codec for Records[2], I saw a twins of fields
"sequence" and "offset". Both of them are calculated by
baseOffset/baseSequence + offset delta. Then I'm a bit confused how to

deal

with them properly - what's the difference between these two concepts
logically?

Also, to understand how the core business logic works, I write a simple
server based on my codec library, and observe that the server may need

to

update offset for records produced. How does Kafka set the correct

offset

for each produced records? And how does Kafka maintain the calculation

for

offset and sequence during these modifications?

I'll appreciate if anyone can answer the question or give some insights

:D


Best,
tison.

[1] https://github.com/tisonkun/kafka-api
[2] https://kafka.apache.org/documentation/#messageformat









Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-07 Thread tison
Hi Matthias and Justine,

Thanks for your reply!

I can summarize the answer as -

Record offset = base offset + offset delta. This field is calculated by the
broker and the delta won't change but the base offset may change during log
normalizing.
Record sequence = base sequence + (offset) delta. This field is given by
the producer and the broker should only read it.

Is it correct?

I implement the manipulation part of base offset following this
understanding at [1].

Best,
tison.

[1]
https://github.com/tisonkun/kafka-api/blob/d080ab7e4b57c0ab0182e0b254333f400e616cd2/simplesrv/src/lib.rs#L391-L394


Justine Olshan  于2023年8月2日周三 04:19写道:

> For what it's worth -- the sequence number is not calculated
> "baseOffset/baseSequence + offset delta" but rather by monotonically
> increasing for a given epoch. If the epoch is bumped, we reset back to
> zero.
> This may mean that the offset and sequence may match, but do not strictly
> need to be the same. The sequence number will also always come from the
> client and be in the produce records sent to the Kafka broker.
>
> As for offsets, there is some code in the log layer that maintains the log
> end offset and assigns offsets to the records. The produce handling on the
> leader should typically assign the offset.
> I believe you can find that code here:
>
> https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/core/src/main/scala/kafka/log/UnifiedLog.scala#L766
>
> Justine
>
> On Tue, Aug 1, 2023 at 11:38 AM Matthias J. Sax  wrote:
>
> > The _offset_ is the position of the record in the partition.
> >
> > The _sequence number_ is a unique ID that allows broker to de-duplicate
> > messages. It requires the producer to implement the idempotency protocol
> > (part of Kafka transactions); thus, sequence numbers are optional and as
> > long as you don't want to support idempotent writes, you don't need to
> > worry about them. (If you want to dig into details, checkout KIP-98 that
> > is the original KIP about Kafka TX).
> >
> > HTH,
> >-Matthias
> >
> > On 8/1/23 2:19 AM, tison wrote:
> > > Hi,
> > >
> > > I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
> > > models its concepts and how the core business logic works.
> > >
> > > During implementing the codec for Records[2], I saw a twins of fields
> > > "sequence" and "offset". Both of them are calculated by
> > > baseOffset/baseSequence + offset delta. Then I'm a bit confused how to
> > deal
> > > with them properly - what's the difference between these two concepts
> > > logically?
> > >
> > > Also, to understand how the core business logic works, I write a simple
> > > server based on my codec library, and observe that the server may need
> to
> > > update offset for records produced. How does Kafka set the correct
> offset
> > > for each produced records? And how does Kafka maintain the calculation
> > for
> > > offset and sequence during these modifications?
> > >
> > > I'll appreciate if anyone can answer the question or give some insights
> > :D
> > >
> > > Best,
> > > tison.
> > >
> > > [1] https://github.com/tisonkun/kafka-api
> > > [2] https://kafka.apache.org/documentation/#messageformat
> > >
> >
>


Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-01 Thread Justine Olshan
For what it's worth -- the sequence number is not calculated
"baseOffset/baseSequence + offset delta" but rather by monotonically
increasing for a given epoch. If the epoch is bumped, we reset back to zero.
This may mean that the offset and sequence may match, but do not strictly
need to be the same. The sequence number will also always come from the
client and be in the produce records sent to the Kafka broker.

As for offsets, there is some code in the log layer that maintains the log
end offset and assigns offsets to the records. The produce handling on the
leader should typically assign the offset.
I believe you can find that code here:
https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/core/src/main/scala/kafka/log/UnifiedLog.scala#L766

Justine

On Tue, Aug 1, 2023 at 11:38 AM Matthias J. Sax  wrote:

> The _offset_ is the position of the record in the partition.
>
> The _sequence number_ is a unique ID that allows broker to de-duplicate
> messages. It requires the producer to implement the idempotency protocol
> (part of Kafka transactions); thus, sequence numbers are optional and as
> long as you don't want to support idempotent writes, you don't need to
> worry about them. (If you want to dig into details, checkout KIP-98 that
> is the original KIP about Kafka TX).
>
> HTH,
>-Matthias
>
> On 8/1/23 2:19 AM, tison wrote:
> > Hi,
> >
> > I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
> > models its concepts and how the core business logic works.
> >
> > During implementing the codec for Records[2], I saw a twins of fields
> > "sequence" and "offset". Both of them are calculated by
> > baseOffset/baseSequence + offset delta. Then I'm a bit confused how to
> deal
> > with them properly - what's the difference between these two concepts
> > logically?
> >
> > Also, to understand how the core business logic works, I write a simple
> > server based on my codec library, and observe that the server may need to
> > update offset for records produced. How does Kafka set the correct offset
> > for each produced records? And how does Kafka maintain the calculation
> for
> > offset and sequence during these modifications?
> >
> > I'll appreciate if anyone can answer the question or give some insights
> :D
> >
> > Best,
> > tison.
> >
> > [1] https://github.com/tisonkun/kafka-api
> > [2] https://kafka.apache.org/documentation/#messageformat
> >
>


Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-01 Thread Matthias J. Sax

The _offset_ is the position of the record in the partition.

The _sequence number_ is a unique ID that allows broker to de-duplicate 
messages. It requires the producer to implement the idempotency protocol 
(part of Kafka transactions); thus, sequence numbers are optional and as 
long as you don't want to support idempotent writes, you don't need to 
worry about them. (If you want to dig into details, checkout KIP-98 that 
is the original KIP about Kafka TX).


HTH,
  -Matthias

On 8/1/23 2:19 AM, tison wrote:

Hi,

I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
models its concepts and how the core business logic works.

During implementing the codec for Records[2], I saw a twins of fields
"sequence" and "offset". Both of them are calculated by
baseOffset/baseSequence + offset delta. Then I'm a bit confused how to deal
with them properly - what's the difference between these two concepts
logically?

Also, to understand how the core business logic works, I write a simple
server based on my codec library, and observe that the server may need to
update offset for records produced. How does Kafka set the correct offset
for each produced records? And how does Kafka maintain the calculation for
offset and sequence during these modifications?

I'll appreciate if anyone can answer the question or give some insights :D

Best,
tison.

[1] https://github.com/tisonkun/kafka-api
[2] https://kafka.apache.org/documentation/#messageformat



[QUESTION] What is the difference between sequence and offset for a Record?

2023-08-01 Thread tison
Hi,

I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
models its concepts and how the core business logic works.

During implementing the codec for Records[2], I saw a twins of fields
"sequence" and "offset". Both of them are calculated by
baseOffset/baseSequence + offset delta. Then I'm a bit confused how to deal
with them properly - what's the difference between these two concepts
logically?

Also, to understand how the core business logic works, I write a simple
server based on my codec library, and observe that the server may need to
update offset for records produced. How does Kafka set the correct offset
for each produced records? And how does Kafka maintain the calculation for
offset and sequence during these modifications?

I'll appreciate if anyone can answer the question or give some insights :D

Best,
tison.

[1] https://github.com/tisonkun/kafka-api
[2] https://kafka.apache.org/documentation/#messageformat


Question about Lag Calculations in Apache Kafka Source Code

2023-07-25 Thread Henry GALVEZ
Hi everyone,

I am interested in understanding how the broker performs lag calculations. 
Specifically, I would like to explore the possibility of improving the 
calculation method to use the latest stable offset instead of the latest offset.

I noticed that there might be differences between the results obtained from 
AdminClient.listOffsets and AdminClient.listConsumerGroupOffsets, and I believe 
investigating this area in the source code might shed some light on potential 
optimizations.

Could you please guide me to the specific part of the Apache Kafka source code 
where the lag calculations are performed? I would greatly appreciate any 
insights or pointers you can provide to help me get started with my 
investigation.

Thank you in advance for your assistance. Looking forward to hearing from you.

Best regards,
Henry

Re: Question ❓

2023-05-10 Thread Matthias J. Sax

Partitions are not for different users.

If you want to isolate users, you would do it at the topic level. You 
could use ACLs to grant access to different topics: 
https://kafka.apache.org/documentation/#security_authz



-Matthias

On 5/9/23 11:11 AM, влад тасканов wrote:


Hi. I recently start­ed studying kafka and raised a question. Is it possible 
for each user to make a separate queue? as I understand it, there is a broker 
with different topics, and each topic had the number of partitions = the number 
of use­rs. if yes, you can link to an example or explanation. Google didn't 
help me.


Re: Question ❓

2023-05-10 Thread hudeqi
The current request queue is very single. In fact, there will be many 
performance problems when the business scenario of a single cluster becomes 
complicated. Not only to divide according to user, but also to isolate 
according to request category, this is just my idea.

best,
hudeqi


 -原始邮件-
 发件人: "влад тасканов" 
 发送时间: 2023-05-10 02:11:21 (星期三)
 收件人: dev@kafka.apache.org
 抄送: 
 主题: Question ❓
 


Question ❓

2023-05-10 Thread влад тасканов

Hi. I recently start­ed studying kafka and raised a question. Is it possible 
for each user to make a separate queue? as I understand it, there is a broker 
with different topics, and each topic had the number of partitions = the number 
of use­rs. if yes, you can link to an example or explanation. Google didn't 
help me.

Re: Question: CI Pipeline in Kafka GitHub Pull Requests & Github Actions Usage

2023-05-05 Thread aaron ai
Dear David:

I greatly appreciate your welcome and the explanation that you offered
regarding the Jenkins system!

I will try to familiarize myself with the standards and contribution
process of the Kafka community, and look forward to making some
contributions in the coming days.

Best regards,
Aaron

On Thu, May 4, 2023 at 11:47 PM David Arthur
 wrote:

> Hello, Aaron and welcome to the project!
>
> If you look towards the bottom of a Pull Request there will be a section
> reporting the status of the Checks for the latest commit.
> There is a "Details" link that takes you to the Jenkins job for that PR.
>
> The default Jenkins view is the new UI (called Blue Ocean I think?). The
> "Classic" Jenkins view is also available by clicking the button near the
> top:
>
> [image: image.png]
>
> From the Classic view it's pretty straightforward to download the console
> log for the job to see what went wrong.
>
> Our build steps are defined in the Jenkinsfile
>  checked into the
> repository. We do have a compile + static analysis step that happens before
> running tests.
>
> -David
>
> On Thu, May 4, 2023 at 5:34 AM aaron ai  wrote:
>
>> Hey folks,
>>
>> I hope this email finds you well. My name is Aaron Ai, and I'm a beginner
>> interested in the Kafka community. I am eager to contribute to Kafka's
>> development and be part of this amazing project.
>>
>> However, I have noticed some confusion surrounding the CI pipeline in
>> Kafka
>> GitHub pull requests. It appears that the builds often fail, and the
>> results are not very clear to analyse. How can I determine if a pull
>> request build is okay for now, given the current CI pipeline situation?
>> Furthermore, is it a good idea to use Github Action to help with some
>> static checks? If so, I would be more than happy to contribute. By the
>> way,
>> is there any plan to migrate to Github Actions in the near future?
>>
>> I eagerly await your response and appreciate any feedback you might have
>> on
>> this matter. Thank you for your time and consideration.
>>
>> Best regards,
>> Aaron
>>
>
>
> --
> -David
>


Re: Question: CI Pipeline in Kafka GitHub Pull Requests & Github Actions Usage

2023-05-04 Thread David Arthur
Hello, Aaron and welcome to the project!

If you look towards the bottom of a Pull Request there will be a section
reporting the status of the Checks for the latest commit.
There is a "Details" link that takes you to the Jenkins job for that PR.

The default Jenkins view is the new UI (called Blue Ocean I think?). The
"Classic" Jenkins view is also available by clicking the button near the
top:

[image: image.png]

>From the Classic view it's pretty straightforward to download the console
log for the job to see what went wrong.

Our build steps are defined in the Jenkinsfile
 checked into the
repository. We do have a compile + static analysis step that happens before
running tests.

-David

On Thu, May 4, 2023 at 5:34 AM aaron ai  wrote:

> Hey folks,
>
> I hope this email finds you well. My name is Aaron Ai, and I'm a beginner
> interested in the Kafka community. I am eager to contribute to Kafka's
> development and be part of this amazing project.
>
> However, I have noticed some confusion surrounding the CI pipeline in Kafka
> GitHub pull requests. It appears that the builds often fail, and the
> results are not very clear to analyse. How can I determine if a pull
> request build is okay for now, given the current CI pipeline situation?
> Furthermore, is it a good idea to use Github Action to help with some
> static checks? If so, I would be more than happy to contribute. By the way,
> is there any plan to migrate to Github Actions in the near future?
>
> I eagerly await your response and appreciate any feedback you might have on
> this matter. Thank you for your time and consideration.
>
> Best regards,
> Aaron
>


-- 
-David


Question: CI Pipeline in Kafka GitHub Pull Requests & Github Actions Usage

2023-05-04 Thread aaron ai
Hey folks,

I hope this email finds you well. My name is Aaron Ai, and I'm a beginner
interested in the Kafka community. I am eager to contribute to Kafka's
development and be part of this amazing project.

However, I have noticed some confusion surrounding the CI pipeline in Kafka
GitHub pull requests. It appears that the builds often fail, and the
results are not very clear to analyse. How can I determine if a pull
request build is okay for now, given the current CI pipeline situation?
Furthermore, is it a good idea to use Github Action to help with some
static checks? If so, I would be more than happy to contribute. By the way,
is there any plan to migrate to Github Actions in the near future?

I eagerly await your response and appreciate any feedback you might have on
this matter. Thank you for your time and consideration.

Best regards,
Aaron


A question about topic limit in produceBytesIn

2023-04-13 Thread hudeqi
Another question about using kafka recently:


Currently, Kafka's write throttle only supports the user dimension and clientId 
dimension of the request. In fact, such a situation is often encountered in 
actual use: a topic entry traffic suddenly increases, and the resource 
bottleneck is about to be reached. It needs to be restricted, but The user 
and/or clientId written to this topic also send data to other topics in this 
cluster, which may affect other topics, and it is impossible to make an 
effective restriction on a single topic. Do we have plans to add throttle in 
topic dimension?

A question about the reassign task in kafka

2023-04-13 Thread hudeqi
One question were found in the process of using kafka recently:

Why does Kafka start fetching from the leader's logStartOffset when Kafka is 
doing the topic reassign task (such as cluster expansion brokers)? If the 
amount of data stored locally by the partition leader is large, the disk IO 
will be full. Although there is a corresponding current throttle mechanism, it 
is difficult to grasp an appropriate limiting threshold. If the setting is not 
accurate, there will still be a instantaneous crit to the disk. 

I have an idea: when doing the reassign task, the new replica starts fetching 
data from logEndOffset/logStartOffset. According to some strategies (I haven’t 
thought about it yet, but it should not be very complicated), for example, when 
judging that the leader’s local data volume is not large, you can start 
directly from logStartOffset, so that the reassign task can be completed 
quickly. If the leader’s local data volume is huge, then the fetch can be 
started from logEndOffset, and only the amount of data accumulated by the new 
replica up to the data retention time can be added to the ISR to complete the 
reassign task.

What do you think about this issue?




best,

hudeqi

Kafka General Question

2023-03-07 Thread Rohan Kar
Hello Kafka Team
I am trying to some POC of the Messaging and it will working fine with the 
remote host and client . But I have question how I can send message/response 
from consumer to producer or server using topic . Please Guide me on this and 
share any doc if possible .

Thank You
Rohan Kar


i have question to kafka error

2023-03-02 Thread SeokYoung Jang

hi~ 

i have question. 

i installed kafka in azure kubernetes services. 

but occur is error. 

-- error --
{"thread_name":"controller-event-thread","msg":"[ReplicaStateMachine 
controllerId=0] Triggering offline replica state 
changes","level":"INFO","time":"2023-02-27T00:09:39.590Z"}
{"thread_name":"kafka-log-cleaner-thread-0","msg":"Failed to clean up log for 
__consumer_offsets-4 in dir /opt/kafka/data/topics due to 
IOException","level":"ERROR","time":"2023-02-27T00:09:39.905Z"}
java.nio.file.FileSystemException: 
/opt/kafka/data/topics/__consumer_offsets-4/.log.cleaned: 
Operation not permitted
at 
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:100)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at 
java.base/sun.nio.fs.UnixFileAttributeViews$Basic.setTimes(UnixFileAttributeViews.java:125)
at java.base/java.nio.file.Files.setLastModifiedTime(Files.java:2355)
at kafka.log.LogSegment.lastModified_$eq(LogSegment.scala:650)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:609)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:538)
at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:537)
at scala.collection.immutable.List.foreach(List.scala:333)
at kafka.log.Cleaner.doClean(LogCleaner.scala:537)
at kafka.log.Cleaner.clean(LogCleaner.scala:511)
at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:380)
at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:352)
at 
kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:321)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{"thread_name":"kafka-log-cleaner-thread-0","msg":"Failed to clean up log for 
__consumer_offsets-4 in dir /opt/kafka/data/topics due to 
IOException","level":"ERROR","time":"2023-02-27T00:09:39.905Z"}
java.nio.file.FileSystemException: 
/opt/kafka/data/topics/__consumer_offsets-4/.log.cleaned: 
Operation not permitted
at 
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:100)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at 
java.base/sun.nio.fs.UnixFileAttributeViews$Basic.setTimes(UnixFileAttributeViews.java:125)
at java.base/java.nio.file.Files.setLastModifiedTime(Files.java:2355)
at kafka.log.LogSegment.lastModified_$eq(LogSegment.scala:650)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:609)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:538)
at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:537)
at scala.collection.immutable.List.foreach(List.scala:333)
at kafka.log.Cleaner.doClean(LogCleaner.scala:537)
at kafka.log.Cleaner.clean(LogCleaner.scala:511)
at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:380)
at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:352)
at 
kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:321)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{"thread_name":"LogDirFailureHandler","msg":"[ReplicaManager broker=0] Stopping 
serving replicas in dir 
/opt/kafka/data/topics","level":"WARN","time":"2023-02-27T00:09:39.910Z"}
{"thread_name":"LogDirFailureHandler","msg":"[ReplicaManager broker=0] Stopping 
serving replicas in dir 
/opt/kafka/data/topics","level":"WARN","time":"2023-02-27T00:09:39.910Z"}
{"thread_name":"LogDirFailureHandler","msg":"[ReplicaManager broker=0] Broker 0 
stopped fetcher for partitions  and stopped moving logs for partitions  because 
they are in the failed log directory 
/opt/kafka/data/topics.","level":"WARN","time":"2023-02-27T00:09:39.920Z"}
{"thread_name":"LogDirFailureHandler","msg":"[ReplicaManager broker=0] Broker 0 
stopped fetcher for partitions  and stopped moving logs for partitions  because 
they are in the failed log directory 
/opt/kafka/d

Re: question

2022-10-10 Thread deng ziming
Hello jincheng,
Kafka provides Java Producer/Consumer/Admin public api, so you can access
Kafka if you are using Java to develop an Android App even though it's not
common. for example, you can develop an Android App to get the Kafka
metadata, send bury-point event log to Kafka, however, a better solution is
to do these jobs in a server and you access your server from your App.

--
Best,
Ziming

On Tue, Oct 11, 2022 at 12:41 AM 关锦成 <865617...@qq.com.invalid> wrote:

> Does kafka support Android client?


Question about plugins / extension API

2022-08-03 Thread Taras Ledkov
Hi colleagues, 

I tried to find the answer in the mail list archive and the Internet, but found 
nothing.
I see that kafka project doesn't introduce general extension/ plugin API for 
three major releases.
I mean the top-level plugin/extension that allow to add some user-specific code 
at the broker server.

e.g.:
trait Plugin {
  def startup(server : Server) : Unit
  def shutdown() : Unit
}

and load plugins by ServiceLoader at the `Kafka.main` after server start.

Does this approach conflict with kafka design? There are projects that have 
been forced to fork the main class `Kafka` in order to implement this 
functionality.

--
With best regards,
Taras Ledkov


Re: Question about the Log Compaction

2022-03-02 Thread Jun Rao
Hi, Liang,

Currently, we store the MD5 of the record key in OffsetMap. Since it has a
large domain (16 bytes), we assume there is no collision there.

Thanks,

Jun

On Wed, Mar 2, 2022 at 1:20 AM 阮良  wrote:

> Hi all
>
> I am confused about the Log Compaction logic,use OffsetMap
> to deduplicating the log.   in my opinion when there is a hash conflict ,
> data may be lost
> Eg: Record1(key1,offset1)  Record2(key2,offset2)
> Conditionhash(key1) == hash(key2)   &&  (offset1 < offset2)
> *Result  Record1 will be remove by mistake *
>
>
>- Did I misunderstand the implementation logic?please give me some
>guidance, thank you very much
>
>
> *1:OffsetMap  put logic does not deal with the hash collision, if
> hash(key1) == hash(key2)key1 will be overwrire*
>
>
>
>
> 2:the logic of retain record
>
>


Question about the Log Compaction

2022-03-02 Thread 阮良
Hi all 


I am confused about the Log Compaction logic,use OffsetMap to deduplicating the 
log.   in my opinion when there is a hash conflict , data may be lost
Eg: Record1(key1,offset1)  Record2(key2,offset2)
Conditionhash(key1) == hash(key2)   &&  (offset1 < offset2)  
Result  Record1 will be remove by mistake 


Did I misunderstand the implementation logic?please give me some guidance, 
thank you very much


1:OffsetMap  put logic does not deal with the hash collision, if hash(key1) == 
hash(key2)key1 will be overwrire








2:the logic of retain record 



Question: Lazy client connections and status check

2022-02-03 Thread Jayesh Thakrar
Hi All,

The admin, consumer and producer client creation are all lazy connections, in 
the sense that one only knows that the connection is “healthy” when an “action” 
is carried out (e.g., send a message).

There is no API call to check if the connection is healthy or if there are any 
issues with connectivity.
E.g., I could give an incorrect host/port address and will not know until I try 
the first action.

However, I do see a need for healthcheck after creating the client.
E.g., I could be looking at sending “yesterday’s data change for an 
application” which may be quite expensive. If I run the 1-hr query and then 
find that the Kafka connection was invalid, it’s a lot of time/effort wasted.

For such and other cases, it’s worthwhile to have a healthcheck that can be 
done before (and even after) running the expensive operation to validate the 
client connection.

Does it make sense to extend the client APIs (interfaces) with a healthcheck? 
Again, the term “healthcheck” may sound “sysadmin”, but it’s not. 
Alternatively, it may be even made implicit like in a JDBC connection, which if 
we know has been established assures that the client connection is ready for 
service.

Thanks,
Jayesh Thakrar





Disclaimer The information in this email and any attachments may contain 
proprietary and confidential information that is intended for the addressee(s) 
only. If you are not the intended recipient, you are hereby notified that any 
disclosure, copying, distribution, retention or use of the contents of this 
information is prohibited. When addressed to our clients or vendors, any 
information contained in this e-mail or any attachments is subject to the terms 
and conditions in any governing contract. If you have received this e-mail in 
error, please immediately contact the sender and delete the e-mail.


Re: poll block question

2021-11-24 Thread Luke Chen
Hi xusheng,
I checked the code stack, and mapped to the kafka code, I can see we either
use Selector#selectNow, or Selector#select(timeoutMs), which should never
hang forever.
If so, I think it might be the `timeoutMS` is too large, and from your code
stack, the timeoutMS came from the Consumer#poll().
Could you reduce the poll timeout to see if that fixes the issue?

Also, I'm checking the code in v3.1, so it might also be possible that
there are some bugs in V2.3 and fixed in future release.

Thank you.
Luke

On Wed, Nov 24, 2021 at 11:12 PM xusheng  wrote:

> hi
> i find something not sure about poll.
> i need some help
> my kafka server version is 1.1.0
> my kafka client version is 2.3.0
> then 128 partition of topic xxx, start 128 thread in one application to
> consume message.
> always run well, but run about a week long , i find there is a consumer
> hang forever.
> strange is only one partition stop consume.
> i dump the Thread info :
>
> "ConsumerGroup-58" #302 prio=5 os_prio=0 tid=0x7f5419626800
> nid=0x191d9 runnable [0x7f540c95]
>
>java.lang.Thread.State: RUNNABLE
>
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>
> - locked <0x0004461055c8> (a sun.nio.ch.Util$3)
>
> - locked <0x0004461055b8> (a java.util.Collections$UnmodifiableSet)
>
> - locked <0x000449932670> (a sun.nio.ch.EPollSelectorImpl)
>
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>
> at org.apache.kafka.common.network.Selector.select(Selector.java:794)
>
> at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>
>
>
>
>
>


poll block question

2021-11-24 Thread xusheng
hi
i find something not sure about poll.
i need some help
my kafka server version is 1.1.0
my kafka client version is 2.3.0
then 128 partition of topic xxx, start 128 thread in one application to consume 
message.
always run well, but run about a week long , i find there is a consumer hang 
forever.
strange is only one partition stop consume.
i dump the Thread info :

"ConsumerGroup-58" #302 prio=5 os_prio=0 tid=0x7f5419626800 nid=0x191d9 
runnable [0x7f540c95]

   java.lang.Thread.State: RUNNABLE

at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)

at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

- locked <0x0004461055c8> (a sun.nio.ch.Util$3)

- locked <0x0004461055b8> (a java.util.Collections$UnmodifiableSet)

- locked <0x000449932670> (a sun.nio.ch.EPollSelectorImpl)

at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

at org.apache.kafka.common.network.Selector.select(Selector.java:794)

at org.apache.kafka.common.network.Selector.poll(Selector.java:467)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)

at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)

at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)

at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)

at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)







Re: I have a question about apache kafka.

2021-04-08 Thread John Roesler
Hello Yun Han Nam,

The users@ list is more typical for this kind of topic, but this list is fine, 
too.

What’s the question?

Thanks,
John

On Thu, Apr 8, 2021, at 06:25, 남윤한[Yun Han Nam] wrote:
> Hi.
> 
> I want to ask you a question about Apache Kafka, is this the right one?
> 
> It is a technical question about error log that comes too often.​
>


I have a question about apache kafka.

2021-04-08 Thread 남윤한 [Yun Han Nam]
Hi.

I want to ask you a question about Apache Kafka, is this the right one?

It is a technical question about error log that comes too often.​


Re: Question on kafka connect's incremental rebalance algorithm

2021-04-05 Thread Luke Chen
Hi Ahmed,
I think this bug KAFKA-12495
 is the issue you
described, which is under code review now.
If not, please open another JIRA ticket to track it.

Thanks.
Luke

On Tue, Apr 6, 2021 at 4:18 AM Thouqueer Ahmed <
thouqueer.ah...@maplelabs.com> wrote:

> Hi,
>   What would happen when new worker joins after the
> synchronization barrier ?
>
> As per code -> performTaskAssignment function of IncrementalAssignor ->
> Boolean canRevoke is false when it is called during the 2nd rebalance.
> Hence revocation is skipped and only assignment is done.
> This would lead to imbalance in #tasks/#connectors.
>
> How is this case handled ?
>
> Thanks,
> Ahmed
>
>


Question on kafka connect's incremental rebalance algorithm

2021-04-05 Thread Thouqueer Ahmed
Hi,
  What would happen when new worker joins after the synchronization 
barrier ?

As per code -> performTaskAssignment function of IncrementalAssignor -> Boolean 
canRevoke is false when it is called during the 2nd rebalance. Hence revocation 
is skipped and only assignment is done.
This would lead to imbalance in #tasks/#connectors.

How is this case handled ?

Thanks,
Ahmed



Question for KafkaRequestHandler

2021-01-24 Thread 阮良
Hi Folks, 
   I am confused about the code below ,why the IO thread set the daemon ?   
in my thought , daemon thread is not suitable for some importment work 


def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, 
threadPoolSize, requestChannel, apis, time)
  KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
}










Re: [jira] [Created] (KAFKA-12157) test Upgrade 2.7.0 from 2.0.0 occur a question

2021-01-07 Thread wenbing shen
Hi,team
I ran into a problem while testing the 2.0.0 rolling upgrade 2.7.0. I tried
to reproduce it again and it failed. I am curious how this problem is
caused, can anyone help analyze it?
Thanks.
Wenbing

Wenbing Shen (Jira)  于2021年1月7日周四 下午6:26写道:

> Wenbing Shen created KAFKA-12157:
> 
>
>  Summary: test Upgrade 2.7.0 from 2.0.0 occur a question
>  Key: KAFKA-12157
>  URL: https://issues.apache.org/jira/browse/KAFKA-12157
>  Project: Kafka
>   Issue Type: Bug
>   Components: log
> Affects Versions: 2.7.0
> Reporter: Wenbing Shen
>  Attachments: 1001server.log, 1001serverlog.png, 1003server.log,
> 1003serverlog.png, 1003statechange.log
>
> I was in a test environment, rolling upgrade from version 2.0.0 to version
> 2.7.0, and encountered the following problems. When the rolling upgrade
> progressed to the second round, I stopped the first broker(1001) in the
> second round and the following error occurred. When an agent processes the
> client producer request, the starting offset of the leader epoch of the
> partition leader suddenly becomes 0, and then continues to process write
> requests for the same partition, and an error log will appear.All partition
> leaders with 1001 replicas are transferred to the 1003 node, and these
> partitions on the 1003 node will generate this error if they receive
> production requests.When I restart 1001, the 1001 broker will report the
> following error:
>
> [2021-01-06 16:46:55,955] ERROR (ReplicaFetcherThread-8-1003
> kafka.server.ReplicaFetcherThread 76) [ReplicaFetcher replicaId=1001,
> leaderId=1003, fetcherId=8] Unexpected error occurred while processing data
> for partition test-perf1-9 at offset 9666953
>
> I use the following command to make a production request:
>
> nohup /home/kafka/software/kafka/bin/kafka-producer-perf-test.sh
> --num-records 1 --record-size 1000 --throughput 3
> --producer-props bootstrap.servers=hdp1:9092,hdp2:9092,hdp3:9092 acks=1
> --topic test-perf1 > 1pro.log 2>&1 &
>
>
>
> I tried to reproduce the problem again, but after three attempts, it did
> not reappear. I am curious how this problem occurred and why the 1003
> broker resets startOffset to 0 of leaderEpoch 4 when the offset is assigned
> by broker in Log.append function.
>
>
>
> broker 1003: server.log
>
> [2021-01-06 16:37:59,492] WARN (data-plane-kafka-request-handler-131
> kafka.server.epoch.LeaderEpochFileCache 70) [LeaderEpochCache test-perf1-9]
> New epoch en
> try EpochEntry(epoch=4, startOffset=0) caused truncation of conflicting
> entries ListBuffer(EpochEntry(epoch=4, startOffset=9667122),
> EpochEntry(epoch=3, star
> tOffset=9195729), EpochEntry(epoch=2, startOffset=8348201)). Cache now
> contains 0 entries.
> [2021-01-06 16:37:59,493] WARN (data-plane-kafka-request-handler-131
> kafka.server.epoch.LeaderEpochFileCache 70) [LeaderEpochCache test-perf1-8]
> New epoch en
> try EpochEntry(epoch=3, startOffset=0) caused truncation of conflicting
> entries ListBuffer(EpochEntry(epoch=3, startOffset=9667478),
> EpochEntry(epoch=2, star
> tOffset=9196127), EpochEntry(epoch=1, startOffset=8342787)). Cache now
> contains 0 entries.
> [2021-01-06 16:37:59,495] WARN (data-plane-kafka-request-handler-131
> kafka.server.epoch.LeaderEpochFileCache 70) [LeaderEpochCache test-perf1-2]
> New epoch en
> try EpochEntry(epoch=3, startOffset=0) caused truncation of conflicting
> entries ListBuffer(EpochEntry(epoch=3, startOffset=9667478),
> EpochEntry(epoch=2, star
> tOffset=9196127), EpochEntry(epoch=1, startOffset=8336727)). Cache now
> contains 0 entries.
> [2021-01-06 16:37:59,498] ERROR (data-plane-kafka-request-handler-142
> kafka.server.ReplicaManager 76) [ReplicaManager broker=1003] Error
> processing append op
> eration on partition test-perf1-9
> java.lang.IllegalArgumentException: Received invalid partition leader
> epoch entry EpochEntry(epoch=4, startOffset=-3)
>  at
> kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:67)
>  at
> kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:59)
>  at kafka.log.Log.maybeAssignEpochStartOffset(Log.scala:1268)
>  at kafka.log.Log.$anonfun$append$6(Log.scala:1181)
>  at kafka.log.Log$$Lambda$935/184936331.accept(Unknown Source)
>  at java.lang.Iterable.forEach(Iterable.java:75)
>  at kafka.log.Log.$anonfun$append$2(Log.scala:1179)
>  at kafka.log.Log.append(Log.scala:2387)
>  at kafka.log.Log.appendAsLeader(Log.scala:1050)
>  at
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
>  at kafka.cluster.Partition.appe

[jira] [Created] (KAFKA-12157) test Upgrade 2.7.0 from 2.0.0 occur a question

2021-01-07 Thread Wenbing Shen (Jira)
Wenbing Shen created KAFKA-12157:


 Summary: test Upgrade 2.7.0 from 2.0.0 occur a question
 Key: KAFKA-12157
 URL: https://issues.apache.org/jira/browse/KAFKA-12157
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 2.7.0
Reporter: Wenbing Shen
 Attachments: 1001server.log, 1001serverlog.png, 1003server.log, 
1003serverlog.png, 1003statechange.log

I was in a test environment, rolling upgrade from version 2.0.0 to version 
2.7.0, and encountered the following problems. When the rolling upgrade 
progressed to the second round, I stopped the first broker(1001) in the second 
round and the following error occurred. When an agent processes the client 
producer request, the starting offset of the leader epoch of the partition 
leader suddenly becomes 0, and then continues to process write requests for the 
same partition, and an error log will appear.All partition leaders with 1001 
replicas are transferred to the 1003 node, and these partitions on the 1003 
node will generate this error if they receive production requests.When I 
restart 1001, the 1001 broker will report the following error:

[2021-01-06 16:46:55,955] ERROR (ReplicaFetcherThread-8-1003 
kafka.server.ReplicaFetcherThread 76) [ReplicaFetcher replicaId=1001, 
leaderId=1003, fetcherId=8] Unexpected error occurred while processing data for 
partition test-perf1-9 at offset 9666953

I use the following command to make a production request:

nohup /home/kafka/software/kafka/bin/kafka-producer-perf-test.sh --num-records 
1 --record-size 1000 --throughput 3 --producer-props 
bootstrap.servers=hdp1:9092,hdp2:9092,hdp3:9092 acks=1 --topic test-perf1 > 
1pro.log 2>&1 &

 

I tried to reproduce the problem again, but after three attempts, it did not 
reappear. I am curious how this problem occurred and why the 1003 broker resets 
startOffset to 0 of leaderEpoch 4 when the offset is assigned by broker in 
Log.append function.

 

broker 1003: server.log

[2021-01-06 16:37:59,492] WARN (data-plane-kafka-request-handler-131 
kafka.server.epoch.LeaderEpochFileCache 70) [LeaderEpochCache test-perf1-9] New 
epoch en
try EpochEntry(epoch=4, startOffset=0) caused truncation of conflicting entries 
ListBuffer(EpochEntry(epoch=4, startOffset=9667122), EpochEntry(epoch=3, star
tOffset=9195729), EpochEntry(epoch=2, startOffset=8348201)). Cache now contains 
0 entries.
[2021-01-06 16:37:59,493] WARN (data-plane-kafka-request-handler-131 
kafka.server.epoch.LeaderEpochFileCache 70) [LeaderEpochCache test-perf1-8] New 
epoch en
try EpochEntry(epoch=3, startOffset=0) caused truncation of conflicting entries 
ListBuffer(EpochEntry(epoch=3, startOffset=9667478), EpochEntry(epoch=2, star
tOffset=9196127), EpochEntry(epoch=1, startOffset=8342787)). Cache now contains 
0 entries.
[2021-01-06 16:37:59,495] WARN (data-plane-kafka-request-handler-131 
kafka.server.epoch.LeaderEpochFileCache 70) [LeaderEpochCache test-perf1-2] New 
epoch en
try EpochEntry(epoch=3, startOffset=0) caused truncation of conflicting entries 
ListBuffer(EpochEntry(epoch=3, startOffset=9667478), EpochEntry(epoch=2, star
tOffset=9196127), EpochEntry(epoch=1, startOffset=8336727)). Cache now contains 
0 entries.
[2021-01-06 16:37:59,498] ERROR (data-plane-kafka-request-handler-142 
kafka.server.ReplicaManager 76) [ReplicaManager broker=1003] Error processing 
append op
eration on partition test-perf1-9
java.lang.IllegalArgumentException: Received invalid partition leader epoch 
entry EpochEntry(epoch=4, startOffset=-3)
 at 
kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:67)
 at 
kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:59)
 at kafka.log.Log.maybeAssignEpochStartOffset(Log.scala:1268)
 at kafka.log.Log.$anonfun$append$6(Log.scala:1181)
 at kafka.log.Log$$Lambda$935/184936331.accept(Unknown Source)
 at java.lang.Iterable.forEach(Iterable.java:75)
 at kafka.log.Log.$anonfun$append$2(Log.scala:1179)
 at kafka.log.Log.append(Log.scala:2387)
 at kafka.log.Log.appendAsLeader(Log.scala:1050)
 at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
 at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
 at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
 at kafka.server.ReplicaManager$$Lambda$1025/1369541490.apply(Unknown Source)
 at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
 at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
 at scala.collection.mutable.HashMap.map(HashMap.scala:35)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
 at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
 at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)

 

broker 1001:server.log

[2021-01-0

question

2020-11-04 Thread ????
Hello, 

I'm working on Kafka development. Now,I have a question. Does Kafka support 
Android client?

Re: Question Regarding Offset Behavior When Calling Poll()

2020-10-08 Thread Zhen Zhang
Thank you.

Regards,
Zhen Zhang
Software Engineer
[image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
MOBILE (949) 771-6073
EMAIL zzh...@twilio.com


On Wed, Oct 7, 2020 at 11:35 PM Matthias J. Sax  wrote:

> I guess it's a question of the client implementation and you cannot
> infer the behavior from the Java client.
>
> I would _assume_ that the offset is _not_ advanced. But the behavior is
> not define by Kafka itself, but it's up the the client implementation
> itself. Thus, only the client's docs or community can clarify.
>
> Sorry that I cannot provide a better answer.
>
> -Matthias
>
>
> On 9/25/20 1:03 AM, Zhen Zhang wrote:
> > Hi,
> >
> > Sorry for the late reply, let me clarify on this.
> >
> > I am developing using Golang so I used a library based on librdkafka, and
> > there's one function, ReadMesage(), which is a wrapper on top of the
> poll()
> > function, except that it will only poll one message(record) at a time and
> > return either one of the following,
> >
> > 1. (msg, nil) -> normal situation with no error;
> > 2. (nil, err) -> err is a Kafka timeout error;
> > 3. (nil, err) -> err is general error;
> > 4. (msg, err) -> err is partition-specific error.
> >
> > When I was browsing the javadocs
> >
> https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> ,
> > I noticed the concept of `Offsets and Consumer Position`, so I am
> wondering
> > that in the event of the situation 2, 3 and 4, will the position still
> > increment by 1 even if what I get from ReadMessage() is an error?
> >
> > I tried to read the docs but didn't find anything useful I can relate
> to, I
> > tried to ask for help from the contributors of the library I was using
> but
> > didn't get an answer, not sure if this question is too obvious or too
> > noobie lolol. Since that library I used is an implementation of the Kafka
> > protocol, I decided to ask it here and I sincerely hope I can get some
> > insights from a Kafka guru.
> >
> > Thanks,
> > Zhen Zhang
> > Software Engineer
> > [image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
> > MOBILE (949) 771-6073
> > EMAIL zzh...@twilio.com
> >
> >
> > On Wed, Sep 23, 2020 at 9:45 AM Matthias J. Sax 
> wrote:
> >
> >> I guess it depends where the exception comes from? Can you clarify?
> >>
> >> -Matthias
> >>
> >> On 9/23/20 12:53 AM, Zhen Zhang wrote:
> >>> Hi there,
> >>>
> >>> I am new to Kafka and I would like to get some clarifications for a
> >> newbie
> >>> question,
> >>>
> >>> Let's say if I have set up my consumer's "enable.auto.commit" to false,
> >> and
> >>> then poll the records one at a time. So when calling poll(), starting
> >> from
> >>> offset 0, if any exception is thrown, should I expect to get the record
> >> at
> >>> offset 0 or offset 1 when I call poll() again? The reason I'm asking
> for
> >>> this is bc in the Kafka Doc, it says that,
> >>> "The position of the consumer gives the offset of the next record that
> >> will
> >>> be given out. It will be one larger than the highest offset the
> consumer
> >>> has seen in that partition. It automatically advances every time the
> >>> consumer receives messages in a call to poll(Duration)."
> >>>
> >>> But in my described situation above, an exception is thrown, I'm not
> sure
> >>> if this is counted as a successful poll (meaning that the next poll()
> >> will
> >>> give the next record) or a failed one (meaning that the next poll()
> will
> >>> give the same record again).
> >>>
> >>> I would really appreciate it for your help.
> >>>
> >>> Thanks,
> >>> Zhen Zhang
> >>> Software Engineer
> >>> [image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
> >>> MOBILE (949) 771-6073
> >>> EMAIL zzh...@twilio.com
> >>>
> >>
> >
>
>


Re: Question Regarding Offset Behavior When Calling Poll()

2020-10-08 Thread Matthias J. Sax
I guess it's a question of the client implementation and you cannot
infer the behavior from the Java client.

I would _assume_ that the offset is _not_ advanced. But the behavior is
not define by Kafka itself, but it's up the the client implementation
itself. Thus, only the client's docs or community can clarify.

Sorry that I cannot provide a better answer.

-Matthias


On 9/25/20 1:03 AM, Zhen Zhang wrote:
> Hi,
> 
> Sorry for the late reply, let me clarify on this.
> 
> I am developing using Golang so I used a library based on librdkafka, and
> there's one function, ReadMesage(), which is a wrapper on top of the poll()
> function, except that it will only poll one message(record) at a time and
> return either one of the following,
> 
> 1. (msg, nil) -> normal situation with no error;
> 2. (nil, err) -> err is a Kafka timeout error;
> 3. (nil, err) -> err is general error;
> 4. (msg, err) -> err is partition-specific error.
> 
> When I was browsing the javadocs
> https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html,
> I noticed the concept of `Offsets and Consumer Position`, so I am wondering
> that in the event of the situation 2, 3 and 4, will the position still
> increment by 1 even if what I get from ReadMessage() is an error?
> 
> I tried to read the docs but didn't find anything useful I can relate to, I
> tried to ask for help from the contributors of the library I was using but
> didn't get an answer, not sure if this question is too obvious or too
> noobie lolol. Since that library I used is an implementation of the Kafka
> protocol, I decided to ask it here and I sincerely hope I can get some
> insights from a Kafka guru.
> 
> Thanks,
> Zhen Zhang
> Software Engineer
> [image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
> MOBILE (949) 771-6073
> EMAIL zzh...@twilio.com
> 
> 
> On Wed, Sep 23, 2020 at 9:45 AM Matthias J. Sax  wrote:
> 
>> I guess it depends where the exception comes from? Can you clarify?
>>
>> -Matthias
>>
>> On 9/23/20 12:53 AM, Zhen Zhang wrote:
>>> Hi there,
>>>
>>> I am new to Kafka and I would like to get some clarifications for a
>> newbie
>>> question,
>>>
>>> Let's say if I have set up my consumer's "enable.auto.commit" to false,
>> and
>>> then poll the records one at a time. So when calling poll(), starting
>> from
>>> offset 0, if any exception is thrown, should I expect to get the record
>> at
>>> offset 0 or offset 1 when I call poll() again? The reason I'm asking for
>>> this is bc in the Kafka Doc, it says that,
>>> "The position of the consumer gives the offset of the next record that
>> will
>>> be given out. It will be one larger than the highest offset the consumer
>>> has seen in that partition. It automatically advances every time the
>>> consumer receives messages in a call to poll(Duration)."
>>>
>>> But in my described situation above, an exception is thrown, I'm not sure
>>> if this is counted as a successful poll (meaning that the next poll()
>> will
>>> give the next record) or a failed one (meaning that the next poll() will
>>> give the same record again).
>>>
>>> I would really appreciate it for your help.
>>>
>>> Thanks,
>>> Zhen Zhang
>>> Software Engineer
>>> [image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
>>> MOBILE (949) 771-6073
>>> EMAIL zzh...@twilio.com
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Question Regarding Offset Behavior When Calling Poll()

2020-09-25 Thread Zhen Zhang
Hi,

Sorry for the late reply, let me clarify on this.

I am developing using Golang so I used a library based on librdkafka, and
there's one function, ReadMesage(), which is a wrapper on top of the poll()
function, except that it will only poll one message(record) at a time and
return either one of the following,

1. (msg, nil) -> normal situation with no error;
2. (nil, err) -> err is a Kafka timeout error;
3. (nil, err) -> err is general error;
4. (msg, err) -> err is partition-specific error.

When I was browsing the javadocs
https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html,
I noticed the concept of `Offsets and Consumer Position`, so I am wondering
that in the event of the situation 2, 3 and 4, will the position still
increment by 1 even if what I get from ReadMessage() is an error?

I tried to read the docs but didn't find anything useful I can relate to, I
tried to ask for help from the contributors of the library I was using but
didn't get an answer, not sure if this question is too obvious or too
noobie lolol. Since that library I used is an implementation of the Kafka
protocol, I decided to ask it here and I sincerely hope I can get some
insights from a Kafka guru.

Thanks,
Zhen Zhang
Software Engineer
[image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
MOBILE (949) 771-6073
EMAIL zzh...@twilio.com


On Wed, Sep 23, 2020 at 9:45 AM Matthias J. Sax  wrote:

> I guess it depends where the exception comes from? Can you clarify?
>
> -Matthias
>
> On 9/23/20 12:53 AM, Zhen Zhang wrote:
> > Hi there,
> >
> > I am new to Kafka and I would like to get some clarifications for a
> newbie
> > question,
> >
> > Let's say if I have set up my consumer's "enable.auto.commit" to false,
> and
> > then poll the records one at a time. So when calling poll(), starting
> from
> > offset 0, if any exception is thrown, should I expect to get the record
> at
> > offset 0 or offset 1 when I call poll() again? The reason I'm asking for
> > this is bc in the Kafka Doc, it says that,
> > "The position of the consumer gives the offset of the next record that
> will
> > be given out. It will be one larger than the highest offset the consumer
> > has seen in that partition. It automatically advances every time the
> > consumer receives messages in a call to poll(Duration)."
> >
> > But in my described situation above, an exception is thrown, I'm not sure
> > if this is counted as a successful poll (meaning that the next poll()
> will
> > give the next record) or a failed one (meaning that the next poll() will
> > give the same record again).
> >
> > I would really appreciate it for your help.
> >
> > Thanks,
> > Zhen Zhang
> > Software Engineer
> > [image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
> > MOBILE (949) 771-6073
> > EMAIL zzh...@twilio.com
> >
>


Re: Question Regarding Offset Behavior When Calling Poll()

2020-09-23 Thread Matthias J. Sax
I guess it depends where the exception comes from? Can you clarify?

-Matthias

On 9/23/20 12:53 AM, Zhen Zhang wrote:
> Hi there,
> 
> I am new to Kafka and I would like to get some clarifications for a newbie
> question,
> 
> Let's say if I have set up my consumer's "enable.auto.commit" to false, and
> then poll the records one at a time. So when calling poll(), starting from
> offset 0, if any exception is thrown, should I expect to get the record at
> offset 0 or offset 1 when I call poll() again? The reason I'm asking for
> this is bc in the Kafka Doc, it says that,
> "The position of the consumer gives the offset of the next record that will
> be given out. It will be one larger than the highest offset the consumer
> has seen in that partition. It automatically advances every time the
> consumer receives messages in a call to poll(Duration)."
> 
> But in my described situation above, an exception is thrown, I'm not sure
> if this is counted as a successful poll (meaning that the next poll() will
> give the next record) or a failed one (meaning that the next poll() will
> give the same record again).
> 
> I would really appreciate it for your help.
> 
> Thanks,
> Zhen Zhang
> Software Engineer
> [image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
> MOBILE (949) 771-6073
> EMAIL zzh...@twilio.com
> 


Question Regarding Offset Behavior When Calling Poll()

2020-09-23 Thread Zhen Zhang
Hi there,

I am new to Kafka and I would like to get some clarifications for a newbie
question,

Let's say if I have set up my consumer's "enable.auto.commit" to false, and
then poll the records one at a time. So when calling poll(), starting from
offset 0, if any exception is thrown, should I expect to get the record at
offset 0 or offset 1 when I call poll() again? The reason I'm asking for
this is bc in the Kafka Doc, it says that,
"The position of the consumer gives the offset of the next record that will
be given out. It will be one larger than the highest offset the consumer
has seen in that partition. It automatically advances every time the
consumer receives messages in a call to poll(Duration)."

But in my described situation above, an exception is thrown, I'm not sure
if this is counted as a successful poll (meaning that the next poll() will
give the next record) or a failed one (meaning that the next poll() will
give the same record again).

I would really appreciate it for your help.

Thanks,
Zhen Zhang
Software Engineer
[image: Twilio] <https://www.twilio.com/?utm_source=email_signature>
MOBILE (949) 771-6073
EMAIL zzh...@twilio.com


Question about [KIP-354]: Compaction can be delayed indefinitely in logs without traffic

2020-08-21 Thread Christian Apolloni
Hello,

It's my understanding that [KIP-354] should ensure eligibility for compaction 
after the min.compaction.lag.ms is elapsed. The parameter should ensure that a 
new segment is rolled within the given time. By doing some tests and looking 
into the code, I think this is not always the case.

As far as I understand, the trigger for eventually rolling a new segment is 
only called when a new message is appended into the log. In low-traffic topics 
this could mean that no new message is received for a while. In the meantime 
the min.compaction.lag.ms can elapse but since no new message arrives, no new 
segment is rolled, causing the last active segment to remain uncompacted.

Is my understanding correct? If yes, is this behaviour unexpected or a known 
limitation of the compaction mechanism?

Kind regards,

-- 
Christian Apolloni




Re: Question around release plans for Apache Kafka 2.3.2 and 2.4.2

2020-06-24 Thread Gokul Ramanan Subramanian
Thanks Gwen and Ismael.

On Wed, Jun 24, 2020 at 6:52 PM Gwen Shapira  wrote:

> Kafka follows a "fully compatible model" and new features are always
> optional. We have extensive compatibility testing to make sure rolling
> upgrades are safe and painless.
> This investment in compatibility allows us to reduce the effort needed in
> maintaining old versions (Security CVE is the obvious exception, as well as
> data-loss bugs).
>
> Not every OSS project has the same model, but it has worked for Kafka for
> years now (since the awful upgrade from 0.7 to 0.8 in 2014).
>
> Gwen
>
>
> On Wed, Jun 24, 2020 at 10:40 AM Gokul Ramanan Subramanian <
> gokul24...@gmail.com> wrote:
>
> > I agree that is an option, but is there any reason to not have a 2.3.2
> and
> > 2.4.2 released? If so, it would be nice to know about these reasons.
> > Appreciate your time on this.
> >
> > On Wed, Jun 24, 2020 at 6:35 PM Ismael Juma  wrote:
> >
> > > I think there's some confusion here. You can upgrade to AK 2.5.0 and
> > > completely ignore ZK and TLS. It's completely optional.
> > >
> > > Ismael
> > >
> > > On Wed, Jun 24, 2020 at 10:20 AM Gokul Ramanan Subramanian <
> > > gokul24...@gmail.com> wrote:
> > >
> > > > Any updates on Kafka 2.3.2 and 2.4.2? Given the complexity of
> migrating
> > > > from non-encrypted TLS to encrypted TLS connection for ZooKeeper, it
> > > would
> > > > be nice to have a bug-free version of 2.3 and 2.4.
> > > >
> > > > Is there a technical reason why we hesitate to get these versions
> out?
> > Or
> > > > is it that no one has got around to it?
> > > >
> > > > On Wed, Jun 24, 2020 at 3:19 PM Sankalp Bhatia <
> > > sankalpbhati...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks Ismael for the response.
> > > > >
> > > > > For our clusters running 2.3.1 and 2.4.1, we saw some issues which
> > had
> > > > > 2.3.2 and 2.4.2 as the fix versions. I looked at 2.5.0 but since it
> > > > > introduces some major changes like support for ZK encryption and a
> > few
> > > > > others, I was wondering if we should choose a smaller upgrade in
> such
> > > > cases
> > > > > as we don't really require the new features in 2.5 and above right
> > now.
> > > > >
> > > > > -
> > > > > Sankalp
> > > > >
> > > > > On Wed, 24 Jun 2020 at 14:23, Ismael Juma 
> wrote:
> > > > >
> > > > > > Hi Sankalp,
> > > > > >
> > > > > > Is there a reason why you cannot upgrade to Apache Kafka 2.5.0
> > > instead?
> > > > > We
> > > > > > are working on the 2.5.1 release, which would be the recommended
> > > > release.
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Wed, Jun 24, 2020 at 6:18 AM Sankalp Bhatia <
> > > > > sankalpbhati...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I would like to know if there are any plans to release a 2.3.2
> > and
> > > > > 2.4.2
> > > > > > > versions for Apache Kafka in the near future. I see there are
> > some
> > > > > issues
> > > > > > > marked as fixed in these two versions
> > > > > > > (https://tinyurl.com/ycdpz5cb).
> > > > > > > However, I could not find a branch/tag corresponding to these
> > > > versions
> > > > > in
> > > > > > > the github repository (https://github.com/apache/kafka).
> > > > > > >
> > > > > > > Also, It would be great if someone can help me understand or
> > share
> > > > any
> > > > > > > documentation around the release processes (specifically on
> when
> > we
> > > > > > decide
> > > > > > > to release a new bug fix version like the ones mentioned
> above.)
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Sankalp
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> Gwen Shapira
> Engineering Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: Question around release plans for Apache Kafka 2.3.2 and 2.4.2

2020-06-24 Thread Gwen Shapira
Kafka follows a "fully compatible model" and new features are always
optional. We have extensive compatibility testing to make sure rolling
upgrades are safe and painless.
This investment in compatibility allows us to reduce the effort needed in
maintaining old versions (Security CVE is the obvious exception, as well as
data-loss bugs).

Not every OSS project has the same model, but it has worked for Kafka for
years now (since the awful upgrade from 0.7 to 0.8 in 2014).

Gwen


On Wed, Jun 24, 2020 at 10:40 AM Gokul Ramanan Subramanian <
gokul24...@gmail.com> wrote:

> I agree that is an option, but is there any reason to not have a 2.3.2 and
> 2.4.2 released? If so, it would be nice to know about these reasons.
> Appreciate your time on this.
>
> On Wed, Jun 24, 2020 at 6:35 PM Ismael Juma  wrote:
>
> > I think there's some confusion here. You can upgrade to AK 2.5.0 and
> > completely ignore ZK and TLS. It's completely optional.
> >
> > Ismael
> >
> > On Wed, Jun 24, 2020 at 10:20 AM Gokul Ramanan Subramanian <
> > gokul24...@gmail.com> wrote:
> >
> > > Any updates on Kafka 2.3.2 and 2.4.2? Given the complexity of migrating
> > > from non-encrypted TLS to encrypted TLS connection for ZooKeeper, it
> > would
> > > be nice to have a bug-free version of 2.3 and 2.4.
> > >
> > > Is there a technical reason why we hesitate to get these versions out?
> Or
> > > is it that no one has got around to it?
> > >
> > > On Wed, Jun 24, 2020 at 3:19 PM Sankalp Bhatia <
> > sankalpbhati...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Ismael for the response.
> > > >
> > > > For our clusters running 2.3.1 and 2.4.1, we saw some issues which
> had
> > > > 2.3.2 and 2.4.2 as the fix versions. I looked at 2.5.0 but since it
> > > > introduces some major changes like support for ZK encryption and a
> few
> > > > others, I was wondering if we should choose a smaller upgrade in such
> > > cases
> > > > as we don't really require the new features in 2.5 and above right
> now.
> > > >
> > > > -
> > > > Sankalp
> > > >
> > > > On Wed, 24 Jun 2020 at 14:23, Ismael Juma  wrote:
> > > >
> > > > > Hi Sankalp,
> > > > >
> > > > > Is there a reason why you cannot upgrade to Apache Kafka 2.5.0
> > instead?
> > > > We
> > > > > are working on the 2.5.1 release, which would be the recommended
> > > release.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Wed, Jun 24, 2020 at 6:18 AM Sankalp Bhatia <
> > > > sankalpbhati...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I would like to know if there are any plans to release a 2.3.2
> and
> > > > 2.4.2
> > > > > > versions for Apache Kafka in the near future. I see there are
> some
> > > > issues
> > > > > > marked as fixed in these two versions
> > > > > > (https://tinyurl.com/ycdpz5cb).
> > > > > > However, I could not find a branch/tag corresponding to these
> > > versions
> > > > in
> > > > > > the github repository (https://github.com/apache/kafka).
> > > > > >
> > > > > > Also, It would be great if someone can help me understand or
> share
> > > any
> > > > > > documentation around the release processes (specifically on when
> we
> > > > > decide
> > > > > > to release a new bug fix version like the ones mentioned above.)
> > > > > >
> > > > > > Thanks,
> > > > > > Sankalp
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 
Gwen Shapira
Engineering Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: Question around release plans for Apache Kafka 2.3.2 and 2.4.2

2020-06-24 Thread Gokul Ramanan Subramanian
I agree that is an option, but is there any reason to not have a 2.3.2 and
2.4.2 released? If so, it would be nice to know about these reasons.
Appreciate your time on this.

On Wed, Jun 24, 2020 at 6:35 PM Ismael Juma  wrote:

> I think there's some confusion here. You can upgrade to AK 2.5.0 and
> completely ignore ZK and TLS. It's completely optional.
>
> Ismael
>
> On Wed, Jun 24, 2020 at 10:20 AM Gokul Ramanan Subramanian <
> gokul24...@gmail.com> wrote:
>
> > Any updates on Kafka 2.3.2 and 2.4.2? Given the complexity of migrating
> > from non-encrypted TLS to encrypted TLS connection for ZooKeeper, it
> would
> > be nice to have a bug-free version of 2.3 and 2.4.
> >
> > Is there a technical reason why we hesitate to get these versions out? Or
> > is it that no one has got around to it?
> >
> > On Wed, Jun 24, 2020 at 3:19 PM Sankalp Bhatia <
> sankalpbhati...@gmail.com>
> > wrote:
> >
> > > Thanks Ismael for the response.
> > >
> > > For our clusters running 2.3.1 and 2.4.1, we saw some issues which had
> > > 2.3.2 and 2.4.2 as the fix versions. I looked at 2.5.0 but since it
> > > introduces some major changes like support for ZK encryption and a few
> > > others, I was wondering if we should choose a smaller upgrade in such
> > cases
> > > as we don't really require the new features in 2.5 and above right now.
> > >
> > > -
> > > Sankalp
> > >
> > > On Wed, 24 Jun 2020 at 14:23, Ismael Juma  wrote:
> > >
> > > > Hi Sankalp,
> > > >
> > > > Is there a reason why you cannot upgrade to Apache Kafka 2.5.0
> instead?
> > > We
> > > > are working on the 2.5.1 release, which would be the recommended
> > release.
> > > >
> > > > Ismael
> > > >
> > > > On Wed, Jun 24, 2020 at 6:18 AM Sankalp Bhatia <
> > > sankalpbhati...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I would like to know if there are any plans to release a 2.3.2 and
> > > 2.4.2
> > > > > versions for Apache Kafka in the near future. I see there are some
> > > issues
> > > > > marked as fixed in these two versions
> > > > > (https://tinyurl.com/ycdpz5cb).
> > > > > However, I could not find a branch/tag corresponding to these
> > versions
> > > in
> > > > > the github repository (https://github.com/apache/kafka).
> > > > >
> > > > > Also, It would be great if someone can help me understand or share
> > any
> > > > > documentation around the release processes (specifically on when we
> > > > decide
> > > > > to release a new bug fix version like the ones mentioned above.)
> > > > >
> > > > > Thanks,
> > > > > Sankalp
> > > > >
> > > >
> > >
> >
>


Re: Question around release plans for Apache Kafka 2.3.2 and 2.4.2

2020-06-24 Thread Ismael Juma
I think there's some confusion here. You can upgrade to AK 2.5.0 and
completely ignore ZK and TLS. It's completely optional.

Ismael

On Wed, Jun 24, 2020 at 10:20 AM Gokul Ramanan Subramanian <
gokul24...@gmail.com> wrote:

> Any updates on Kafka 2.3.2 and 2.4.2? Given the complexity of migrating
> from non-encrypted TLS to encrypted TLS connection for ZooKeeper, it would
> be nice to have a bug-free version of 2.3 and 2.4.
>
> Is there a technical reason why we hesitate to get these versions out? Or
> is it that no one has got around to it?
>
> On Wed, Jun 24, 2020 at 3:19 PM Sankalp Bhatia 
> wrote:
>
> > Thanks Ismael for the response.
> >
> > For our clusters running 2.3.1 and 2.4.1, we saw some issues which had
> > 2.3.2 and 2.4.2 as the fix versions. I looked at 2.5.0 but since it
> > introduces some major changes like support for ZK encryption and a few
> > others, I was wondering if we should choose a smaller upgrade in such
> cases
> > as we don't really require the new features in 2.5 and above right now.
> >
> > -
> > Sankalp
> >
> > On Wed, 24 Jun 2020 at 14:23, Ismael Juma  wrote:
> >
> > > Hi Sankalp,
> > >
> > > Is there a reason why you cannot upgrade to Apache Kafka 2.5.0 instead?
> > We
> > > are working on the 2.5.1 release, which would be the recommended
> release.
> > >
> > > Ismael
> > >
> > > On Wed, Jun 24, 2020 at 6:18 AM Sankalp Bhatia <
> > sankalpbhati...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I would like to know if there are any plans to release a 2.3.2 and
> > 2.4.2
> > > > versions for Apache Kafka in the near future. I see there are some
> > issues
> > > > marked as fixed in these two versions
> > > > (https://tinyurl.com/ycdpz5cb).
> > > > However, I could not find a branch/tag corresponding to these
> versions
> > in
> > > > the github repository (https://github.com/apache/kafka).
> > > >
> > > > Also, It would be great if someone can help me understand or share
> any
> > > > documentation around the release processes (specifically on when we
> > > decide
> > > > to release a new bug fix version like the ones mentioned above.)
> > > >
> > > > Thanks,
> > > > Sankalp
> > > >
> > >
> >
>


Re: Question around release plans for Apache Kafka 2.3.2 and 2.4.2

2020-06-24 Thread Gokul Ramanan Subramanian
Any updates on Kafka 2.3.2 and 2.4.2? Given the complexity of migrating
from non-encrypted TLS to encrypted TLS connection for ZooKeeper, it would
be nice to have a bug-free version of 2.3 and 2.4.

Is there a technical reason why we hesitate to get these versions out? Or
is it that no one has got around to it?

On Wed, Jun 24, 2020 at 3:19 PM Sankalp Bhatia 
wrote:

> Thanks Ismael for the response.
>
> For our clusters running 2.3.1 and 2.4.1, we saw some issues which had
> 2.3.2 and 2.4.2 as the fix versions. I looked at 2.5.0 but since it
> introduces some major changes like support for ZK encryption and a few
> others, I was wondering if we should choose a smaller upgrade in such cases
> as we don't really require the new features in 2.5 and above right now.
>
> -
> Sankalp
>
> On Wed, 24 Jun 2020 at 14:23, Ismael Juma  wrote:
>
> > Hi Sankalp,
> >
> > Is there a reason why you cannot upgrade to Apache Kafka 2.5.0 instead?
> We
> > are working on the 2.5.1 release, which would be the recommended release.
> >
> > Ismael
> >
> > On Wed, Jun 24, 2020 at 6:18 AM Sankalp Bhatia <
> sankalpbhati...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I would like to know if there are any plans to release a 2.3.2 and
> 2.4.2
> > > versions for Apache Kafka in the near future. I see there are some
> issues
> > > marked as fixed in these two versions
> > > (https://tinyurl.com/ycdpz5cb).
> > > However, I could not find a branch/tag corresponding to these versions
> in
> > > the github repository (https://github.com/apache/kafka).
> > >
> > > Also, It would be great if someone can help me understand or share any
> > > documentation around the release processes (specifically on when we
> > decide
> > > to release a new bug fix version like the ones mentioned above.)
> > >
> > > Thanks,
> > > Sankalp
> > >
> >
>


Re: Question around release plans for Apache Kafka 2.3.2 and 2.4.2

2020-06-24 Thread Sankalp Bhatia
Thanks Ismael for the response.

For our clusters running 2.3.1 and 2.4.1, we saw some issues which had
2.3.2 and 2.4.2 as the fix versions. I looked at 2.5.0 but since it
introduces some major changes like support for ZK encryption and a few
others, I was wondering if we should choose a smaller upgrade in such cases
as we don't really require the new features in 2.5 and above right now.

-
Sankalp

On Wed, 24 Jun 2020 at 14:23, Ismael Juma  wrote:

> Hi Sankalp,
>
> Is there a reason why you cannot upgrade to Apache Kafka 2.5.0 instead? We
> are working on the 2.5.1 release, which would be the recommended release.
>
> Ismael
>
> On Wed, Jun 24, 2020 at 6:18 AM Sankalp Bhatia 
> wrote:
>
> > Hi All,
> >
> > I would like to know if there are any plans to release a 2.3.2 and 2.4.2
> > versions for Apache Kafka in the near future. I see there are some issues
> > marked as fixed in these two versions
> > (https://tinyurl.com/ycdpz5cb).
> > However, I could not find a branch/tag corresponding to these versions in
> > the github repository (https://github.com/apache/kafka).
> >
> > Also, It would be great if someone can help me understand or share any
> > documentation around the release processes (specifically on when we
> decide
> > to release a new bug fix version like the ones mentioned above.)
> >
> > Thanks,
> > Sankalp
> >
>


Re: Question around release plans for Apache Kafka 2.3.2 and 2.4.2

2020-06-24 Thread Ismael Juma
Hi Sankalp,

Is there a reason why you cannot upgrade to Apache Kafka 2.5.0 instead? We
are working on the 2.5.1 release, which would be the recommended release.

Ismael

On Wed, Jun 24, 2020 at 6:18 AM Sankalp Bhatia 
wrote:

> Hi All,
>
> I would like to know if there are any plans to release a 2.3.2 and 2.4.2
> versions for Apache Kafka in the near future. I see there are some issues
> marked as fixed in these two versions
> (https://tinyurl.com/ycdpz5cb).
> However, I could not find a branch/tag corresponding to these versions in
> the github repository (https://github.com/apache/kafka).
>
> Also, It would be great if someone can help me understand or share any
> documentation around the release processes (specifically on when we decide
> to release a new bug fix version like the ones mentioned above.)
>
> Thanks,
> Sankalp
>


Question around release plans for Apache Kafka 2.3.2 and 2.4.2

2020-06-24 Thread Sankalp Bhatia
Hi All,

I would like to know if there are any plans to release a 2.3.2 and 2.4.2
versions for Apache Kafka in the near future. I see there are some issues
marked as fixed in these two versions
(https://tinyurl.com/ycdpz5cb).
However, I could not find a branch/tag corresponding to these versions in
the github repository (https://github.com/apache/kafka).

Also, It would be great if someone can help me understand or share any
documentation around the release processes (specifically on when we decide
to release a new bug fix version like the ones mentioned above.)

Thanks,
Sankalp


Re: Retention policies question

2020-04-22 Thread Gwen Shapira
Kafka purges old messages from both leaders and replicas.

If there was a mistake in the book, can you tell me which chapter and page?
We'll fix it.

Gwen

On Wed, Apr 22, 2020, 7:51 AM Alex Bull  wrote:

> Hi, Dear Kafka Developers,
>
> I've read 'Kafka: The Definitive Guide' by Narkhede and others and I have a
> following question.
> On what side topic retention policies (delete or compact) are performed?
> I have a guess that they work only on  brokers that hold leader replica of
> partition.
> Or am I  wrong ?
>
> With best regards,
> Alex.
>


Retention policies question

2020-04-22 Thread Alex Bull
Hi, Dear Kafka Developers,

I've read 'Kafka: The Definitive Guide' by Narkhede and others and I have a
following question.
On what side topic retention policies (delete or compact) are performed?
I have a guess that they work only on  brokers that hold leader replica of
partition.
Or am I  wrong ?

With best regards,
Alex.


Re: Question about log flusher real frequency

2020-03-09 Thread Fares Oueslati
Hi Alexandre,

Thank you for your quick answer.

I want to monitor it cause I'm trying to find out the reason why our
existing Kafka cluster is configured to flush data every10 milliseconds!
(people who configured it are not available anymore to answer).

As that value seems really low to me, I was trying to understand and to
monitor the "flush behaviour".

Fares

On Mon, Mar 9, 2020 at 5:24 PM Alexandre Dupriez <
alexandre.dupr...@gmail.com> wrote:

> Hi Fares,
>
> On Linux kernels, you can use the property "dirty_writeback_centisecs"
> [1] to configure the period between executions of kswapd, which does
> this "sync" job. The period is usually set to 30 seconds.
> There are few exceptions where Kafka explicitly forces a sync (via the
> force() method from the I/O API of the JDK), e.g. when a segment is
> rolled or Kafka shutting down.
>
> The page writeback activity from your kernel is monitorable at
> different levels of granularity and depending on the instrumentation
> you are willing to use.
>
> Why would you want to monitor this activity in the first place? Do you
> want to know exactly *when* your data is on the disk?
>
> [1] https://www.kernel.org/doc/Documentation/sysctl/vm.txt
>
> Le lun. 9 mars 2020 à 15:58, Fares Oueslati  a
> écrit :
> >
> > Hello,
> >
> > By default, both log.flush.interval.ms and log.flush.interval.messages
> are
> > set to Long.MAX_VALUE.
> >
> > As I understand, it makes Kafka flush log to disk (fsync) only depends on
> > file system.
> >
> > Is there any simple way to monitor that frequency ?
> >
> > Is there a rule of thumb to estimate that value depending on the os ?
> >
> > Thank you guys !
> > Fares
>


Re: Question about log flusher real frequency

2020-03-09 Thread Alexandre Dupriez
Hi Fares,

On Linux kernels, you can use the property "dirty_writeback_centisecs"
[1] to configure the period between executions of kswapd, which does
this "sync" job. The period is usually set to 30 seconds.
There are few exceptions where Kafka explicitly forces a sync (via the
force() method from the I/O API of the JDK), e.g. when a segment is
rolled or Kafka shutting down.

The page writeback activity from your kernel is monitorable at
different levels of granularity and depending on the instrumentation
you are willing to use.

Why would you want to monitor this activity in the first place? Do you
want to know exactly *when* your data is on the disk?

[1] https://www.kernel.org/doc/Documentation/sysctl/vm.txt

Le lun. 9 mars 2020 à 15:58, Fares Oueslati  a écrit :
>
> Hello,
>
> By default, both log.flush.interval.ms and log.flush.interval.messages are
> set to Long.MAX_VALUE.
>
> As I understand, it makes Kafka flush log to disk (fsync) only depends on
> file system.
>
> Is there any simple way to monitor that frequency ?
>
> Is there a rule of thumb to estimate that value depending on the os ?
>
> Thank you guys !
> Fares


Question about log flusher real frequency

2020-03-09 Thread Fares Oueslati
Hello,

By default, both log.flush.interval.ms and log.flush.interval.messages are
set to Long.MAX_VALUE.

As I understand, it makes Kafka flush log to disk (fsync) only depends on
file system.

Is there any simple way to monitor that frequency ?

Is there a rule of thumb to estimate that value depending on the os ?

Thank you guys !
Fares


Re: question about offsetSync

2019-11-01 Thread Xu Jianhai
>From my opinion,
condition 4 is downstream data loss because of master down without sync in
time, so downstream data producer get smaller downstreamOffset.
condition 3 meaning upstream broker down without sync in time
condition 1 is init state
so condition 2 may write like this: `downstreamTargetOffset -
lastSyncDownstreamOffset >= maxOffsetLag` meaning not sync offset for long
time, should we rewrite ?

On Sat, Nov 2, 2019 at 12:41 AM Xu Jianhai  wrote:

> Hi:
> I am engineer from China, I review kafka mirror latest impl, but I do
> not figure out the reason why PartitionState update like that:
> ```
>
> // true if we should emit an offset sync
> boolean update(long upstreamOffset, long downstreamOffset) {
> boolean shouldSyncOffsets = false;
> long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
> long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
> if (lastSyncDownstreamOffset == -1L
> || downstreamOffset - downstreamTargetOffset >= maxOffsetLag
> || upstreamOffset - previousUpstreamOffset != 1L
> || downstreamOffset < previousDownstreamOffset) {
> lastSyncUpstreamOffset = upstreamOffset;
> lastSyncDownstreamOffset = downstreamOffset;
> shouldSyncOffsets = true;
> }
> previousUpstreamOffset = upstreamOffset;
> previousDownstreamOffset = downstreamOffset;
> return shouldSyncOffsets;
> }
> }
>
> ```
> I can not know why the condition is like that.
> 1. lastSyncDownstreamOffset == -1L: never sync, so call sync method
> 2. downstreamOffset - downstreamTargetOffset >= maxOffsetLag: offset is
> not accurate, so sync. but why use maxOffsetLag? why not >0: meaning not
> accurate
> 3. upstreamOffset - previousUpstreamOffset != 1L: meaning why?
> 4. downstreamOffset < previousDownstreamOffset: meaning why?
>
>
>
>
>
>
>


question about offsetSync

2019-11-01 Thread Xu Jianhai
Hi:
I am engineer from China, I review kafka mirror latest impl, but I do
not figure out the reason why PartitionState update like that:
```

// true if we should emit an offset sync
boolean update(long upstreamOffset, long downstreamOffset) {
boolean shouldSyncOffsets = false;
long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
if (lastSyncDownstreamOffset == -1L
|| downstreamOffset - downstreamTargetOffset >= maxOffsetLag
|| upstreamOffset - previousUpstreamOffset != 1L
|| downstreamOffset < previousDownstreamOffset) {
lastSyncUpstreamOffset = upstreamOffset;
lastSyncDownstreamOffset = downstreamOffset;
shouldSyncOffsets = true;
}
previousUpstreamOffset = upstreamOffset;
previousDownstreamOffset = downstreamOffset;
return shouldSyncOffsets;
}
}

```
I can not know why the condition is like that.
1. lastSyncDownstreamOffset == -1L: never sync, so call sync method
2. downstreamOffset - downstreamTargetOffset >= maxOffsetLag: offset is not
accurate, so sync. but why use maxOffsetLag? why not >0: meaning not
accurate
3. upstreamOffset - previousUpstreamOffset != 1L: meaning why?
4. downstreamOffset < previousDownstreamOffset: meaning why?


Re: KIP-382 + Kafka Streams Question

2019-07-24 Thread Adam Bellemare
Hi Ryanne

> Lemme know if I haven't answered this clearly.

Nope, this was very helpful. Thank you!

> A single "stream" can come from multiple input topics
I overlooked that - I was thinking of simply using the
StreamBuilder.table() functionality instead, but that function doesn't
support a Collection of topics.

Since the topics would be copartitioned by definition, wouldn't the event
dispatcher in PartitionGroup (priorityQueue and streamtime ordering) ensure
that the topics are processed in incrementing streamtime order?

Alternately, I suppose this could be a case where it is a good idea to have
the timestamp of the event within the event's value payload, such that:
StreamBuilder.streams(Set("userEntity", "primary.userEntity"))
.groupByKey()
.reduce()
can allow us to materialize the latest state for a given key.

Thanks Ryanne, this has been a very helpful discussion for me. We are
prototyping the usage of MM2 internally at the moment in anticipation of
its release in 2.4 and want to ensure we have our replication + recovery
strategies sorted out.

Adam

On Tue, Jul 23, 2019 at 7:26 PM Ryanne Dolan  wrote:

> Adam, I think we are converging :)
>
> > "userEntity"...where I only want the latest emailAddress (basic
> materialization) to send an email on account password update.
>
> Yes, you want all "userEntity" data on both clusters. Each cluster will
> have "userEntity" and the remote counterpart
> "secondary/primary.userEntity", as in my example (1). The send-email part
> can run on either cluster (but not both, to avoid duplicate emails),
> subscribing to both "userEntity" and "secondary/primary.userEntity". For
> DR, you can migrate this app between clusters via offset translation and
> the kafka-streams-application-reset tool.
>
> Then, you want a materialize-email-table app running in _both_ clusters,
> so that the latest emails are readily available in RocksDB from either
> cluster. This also subscribes to both "userEntity" and
> "secondary/primary.userEntity" s.t. records originating from either cluster
> are processed.
>
> (Equivalently, send-email and materialize-email-table could be parts of
> the same Streams app, just configured differently, e.g. with send-email
> short-circuited in all but one cluster.)
>
> Under normal operation, your userEntity events are sent to the primary
> cluster (topic: userEntity), processed there via materialize-email-table
> and send-email, and replicated to the secondary cluster (topic:
> primary.userEntity) via MM2. When primary goes down, your producers
> (whatever is sending userEntity events) can failover to the secondary
> cluster (topic: userEntity). This can happen in real-time, i.e. as soon as
> the producer detects an outage or via a load balancer with healthchecks
> etc. So under normal operation, you have all userEntity events in both
> clusters, and both clusters are available for producing to.
>
> N.B. this is not dual-ingest, which would require you always produce
> directly to both clusters. It's active/active, b/c you can produce to
> either cluster at any point in time, and the effect is the same.
>
> > Q1) Where does the producer write its data to if the primary cluster is
> dead?
>
> With active/active like this, you can send to either cluster.
>
> > Q2) How does a Kafka Streams application materialize state from two
> topics?
>
> A Streams app can subscribe to multiple topics. A single "stream" can come
> from multiple input topics (see:
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.Collection-
> )
>
> Likewise, a KTable can be materialized from multiple source topics -- in
> this case, userEntity, primary.userEntity and/or secondary.userEntity. You
> can think of these as parts of a "virtual topic", as you described.
>
> > (loaded question, I know)
>
> There is one caveat I can think of: there is no ordering guarantee across
> different topics in the same stream, so materialization could be
> inconsistent between the two clusters if, say, the same users's email was
> changed to different values at the same millisecond in both clusters. This
> may or may not be a problem.
>
> > Q3) ... recommendations on how to handle replication/producing of
> entity-data (ie: userEntity) across multiple clusters...
>
> Lemme know if I haven't answered this clearly.
>
> Ryanne
>
> On Tue, Jul 23, 2019 at 1:03 PM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> Thanks for the clarifications! Here is one of my own, as I think it's the
>> biggest stumbling block in my description:
>>
>> *> What is "tabl

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Ryanne Dolan
Adam, I think we are converging :)

> "userEntity"...where I only want the latest emailAddress (basic
materialization) to send an email on account password update.

Yes, you want all "userEntity" data on both clusters. Each cluster will
have "userEntity" and the remote counterpart
"secondary/primary.userEntity", as in my example (1). The send-email part
can run on either cluster (but not both, to avoid duplicate emails),
subscribing to both "userEntity" and "secondary/primary.userEntity". For
DR, you can migrate this app between clusters via offset translation and
the kafka-streams-application-reset tool.

Then, you want a materialize-email-table app running in _both_ clusters, so
that the latest emails are readily available in RocksDB from either
cluster. This also subscribes to both "userEntity" and
"secondary/primary.userEntity" s.t. records originating from either cluster
are processed.

(Equivalently, send-email and materialize-email-table could be parts of the
same Streams app, just configured differently, e.g. with send-email
short-circuited in all but one cluster.)

Under normal operation, your userEntity events are sent to the primary
cluster (topic: userEntity), processed there via materialize-email-table
and send-email, and replicated to the secondary cluster (topic:
primary.userEntity) via MM2. When primary goes down, your producers
(whatever is sending userEntity events) can failover to the secondary
cluster (topic: userEntity). This can happen in real-time, i.e. as soon as
the producer detects an outage or via a load balancer with healthchecks
etc. So under normal operation, you have all userEntity events in both
clusters, and both clusters are available for producing to.

N.B. this is not dual-ingest, which would require you always produce
directly to both clusters. It's active/active, b/c you can produce to
either cluster at any point in time, and the effect is the same.

> Q1) Where does the producer write its data to if the primary cluster is
dead?

With active/active like this, you can send to either cluster.

> Q2) How does a Kafka Streams application materialize state from two
topics?

A Streams app can subscribe to multiple topics. A single "stream" can come
from multiple input topics (see:
https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.Collection-
)

Likewise, a KTable can be materialized from multiple source topics -- in
this case, userEntity, primary.userEntity and/or secondary.userEntity. You
can think of these as parts of a "virtual topic", as you described.

> (loaded question, I know)

There is one caveat I can think of: there is no ordering guarantee across
different topics in the same stream, so materialization could be
inconsistent between the two clusters if, say, the same users's email was
changed to different values at the same millisecond in both clusters. This
may or may not be a problem.

> Q3) ... recommendations on how to handle replication/producing of
entity-data (ie: userEntity) across multiple clusters...

Lemme know if I haven't answered this clearly.

Ryanne

On Tue, Jul 23, 2019 at 1:03 PM Adam Bellemare 
wrote:

> Hi Ryanne
>
> Thanks for the clarifications! Here is one of my own, as I think it's the
> biggest stumbling block in my description:
>
> *> What is "table" exactly? I am interpreting this as a KTable changelog
> topic*
> "table" is not a KTable changelog topic, but simply entity data that is to
> be materialized into a table - for example, relational data captured from
> Kafka Connect. I should have named this "stateful-data" or something less
> ambiguous and provided an explicit definition. Note that non-KStreams
> applications will also regularly use this entity data to materialize their
> own tables, but it in itself is not a KTable internal changelog.
>
> Per your example 1, let's name this topic "userEntity". It could be a
> (key,value) pair of (userId, emailAddress), where I only want the latest
> emailAddress (basic materialization) to send an email on account password
> update. I only want to run the application against one Kafka cluster, and
> because I don't want to use dual-ingest, I am running that application only
> on the cluster where the data is being sent (Primary Cluster). In a
> scenario where all replication is working correctly I could also run this
> off the Secondary cluster's replica, "primary.userEntity"
>
>
>
> *> Yes, that's something like "dual ingest", which I would not recommend.*
> Agreed. I do not want to use dual ingest.
>
> *> Secondary cluster:*
> *> Topics: events, primary.events, table-changelog*
> *> App subscription: events, primary.events*
> *> App output: table-changel

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Adam Bellemare
Hi Ryanne

Thanks for the clarifications! Here is one of my own, as I think it's the
biggest stumbling block in my description:

*> What is "table" exactly? I am interpreting this as a KTable changelog
topic*
"table" is not a KTable changelog topic, but simply entity data that is to
be materialized into a table - for example, relational data captured from
Kafka Connect. I should have named this "stateful-data" or something less
ambiguous and provided an explicit definition. Note that non-KStreams
applications will also regularly use this entity data to materialize their
own tables, but it in itself is not a KTable internal changelog.

Per your example 1, let's name this topic "userEntity". It could be a
(key,value) pair of (userId, emailAddress), where I only want the latest
emailAddress (basic materialization) to send an email on account password
update. I only want to run the application against one Kafka cluster, and
because I don't want to use dual-ingest, I am running that application only
on the cluster where the data is being sent (Primary Cluster). In a
scenario where all replication is working correctly I could also run this
off the Secondary cluster's replica, "primary.userEntity"



*> Yes, that's something like "dual ingest", which I would not recommend.*
Agreed. I do not want to use dual ingest.

*> Secondary cluster:*
*> Topics: events, primary.events, table-changelog*
*> App subscription: events, primary.events*
*> App output: table-changelog*

Is the "events" topic dual ingest, since it exists in the Primary cluster
with the exact same name?

The whole scenario can be boiled down into the following:
1) Entity data is in a userEntity topic, ie: (userId, emailAddress)
2) I want to publish it into an Active-Active cluster setup without using
dual-ingest
3) I want to materialize the data into a single table for an application
consuming from a single cluster (Kafka Streams or not)
4) I want to be able to fail over and rebuild the materialized state using
the data I have replicated.
- If all of the entity data is produced to each cluster (dual-ingest) than
it is trivial to fail over and rebuild the materialized table.
- If the data is only produced to Primary and only replicated to Secondary,
at a failover I would need to consume from the replicated topic.
*Q1) Where does the producer write its data to if the primary cluster
is dead?*
It seems to me that it must then write its data to the only
remaining cluster. This would then put the entity data in two topics as I
had originally outlined, as below:
*Secondary Cluster: (Live)   (renamed table to userEntity)*
  Topic: "primary.userEntity" (contains data from T = 0 to T = n)
  Topic: "userEntity" (contains data from T = n+1 to now, the
failed-over producer)


*Q2) How does a Kafka Streams application materialize state from two
topics? (loaded question, I know)*
  Since I know this isn't built in, is there some sort of technique
or system that you use to allow for a single virtual topic made up of many
logical topics?

*Q3) Do you have any recommendations on how to handle replication/producing
of entity-data (ie: userEntity) across multiple clusters, such that an
application may correctly (or even near-correctly) materialize state after
a failover like the one I described above?*
This is really the golden question. We're currently developing our
Active-Passive approach, but we want to be prepared for scenarios where we
have multiple clusters with entity-replication between clusters.


Thanks Ryanne!


On Tue, Jul 23, 2019 at 12:39 PM Ryanne Dolan  wrote:

> Adam,
>
> > I think we have inconsistent definitions of Active-Active
>
> Yes, this terminology gets thrown around a lot. IMO "active" means both
> producers and consumers are using a cluster under normal operation -- not
> just during outages, and not just by something like MM2. (Obviously, MM2
> has producers and consumers, but they don't count here.) Conversely,
> "standby" or "backup" means that data is being written by a producer, but
> it isn't being consumed under normal operation. I qualify this definition
> with IMO, as I don't think there is strong consensus here.
>
> I'll also add a caveat about "under normal operation". An active/active
> architecture does not necessarily mean that you use both clusters in the
> same way all the time -- only that you _could_. You could load-balance
> 50/50 of your traffic between two clusters, or you could direct 100% to one
> and 0% to the other, e.g. if one is farther away or has less hw resources.
> But the architecture remains the same (and certainly, MM2 doesn't care
> about this detail).
>
> > The producer is only producing to one cluster (primary) and one topic
> (topic "

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Ryanne Dolan
 cluster.". Wouldn't this mean that the
> stateful "table" topic that we wish to materialize would be replicated by
> MM2 from Primary, such that we end up with the following:
>
> *Replicated Entity/Stateful Data:*
> *Primary Cluster: (Live)*
> Topic: "table" (contains data from T = 0 to T = n)
> SteamsAppPrimary is consuming from ("table")
>
> *Secondary Cluster: (Live)*
> Topic: "primary.table" (contains data from T = 0 to T = n)
> SteamsAppSecondary is consuming from ("primary.table")
>
> What does StreamsAppSecondary do when "primary.table" is no longer
> replicated because Primary has died? Additionally, where should the
> producer of topic "table" now write its data to, assuming that Primary
> Cluster is irrevocably lost?
>
> I hope this better outlines my scenario. The trivial solution seems to be
> to make your producers produce all stateful data (topic "table") to each
> cluster, which makes MM2 unnecessary, but can also lead to data
> inconsistencies so it's not exactly foolproof.
>
> Thanks
>
> On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan 
> wrote:
>
>> Hello Adam, thanks for the questions. Yes my organization uses Streams,
>> and yes you can use Streams with MM2/KIP-382, though perhaps not in the way
>> you are describing.
>>
>> The architecture you mention is more "active/standby" than
>> "active/active" IMO. The "secondary" cluster is not being used until a
>> failure, at which point you migrate your app and expect the data to already
>> be there. This works for normal consumers where you can seek() and
>> --reset-offsets. Streams apps can be reset with the
>> kafka-streams-application-reset tool, but as you point out, that doesn't
>> help with rebuilding an app's internal state, which would be missing on the
>> secondary cluster. (Granted, that may be okay depending on your particular
>> application.)
>>
>> A true "active/active" solution IMO would be to run your same Streams app
>> in _both_ clusters (primary, secondary), s.t. the entire application state
>> is available and continuously updated in both clusters. As with normal
>> consumers, the Streams app should subscribe to any remote topics, e.g. with
>> a regex, s.t. the application state will reflect input from either source
>> cluster.
>>
>> This is essentially what Streams' "standby replicas" are -- extra copies
>> of application state to support quicker failover. Without these replicas,
>> Streams would need to start back at offset 0 and re-process everything in
>> order to rebuild state (which you don't want to do during a disaster,
>> especially!). The same logic applies to using Streams with MM2. You _could_
>> failover by resetting the app and rebuilding all the missing state, or you
>> could have a copy of everything sitting there ready when you need it. The
>> easiest way to do the latter is to run your app in both clusters.
>>
>> Hope that helps.
>>
>> Ryanne
>>
>> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare 
>> wrote:
>>
>>> Hi Ryanne
>>>
>>> I have a quick question for you about Active+Active replication and
>>> Kafka Streams. First, does your org /do you use Kafka Streams? If not then
>>> I think this conversation can end here. ;)
>>>
>>> Secondly, and for the broader Kafka Dev group - what happens if I want
>>> to use Active+Active replication with my Kafka Streams app, say, to
>>> materialize a simple KTable? Based on my understanding, I topic "table" on
>>> the primary cluster will be replicated to the secondary cluster as
>>> "primary.table". In the case of a full cluster failure for primary, the
>>> producer to topic "table" on the primary switches over to the secondary
>>> cluster, creates its own "table" topic and continues to write to there. So
>>> now, assuming we have had no data loss, we end up with:
>>>
>>>
>>> *Primary Cluster: (Dead)*
>>>
>>>
>>> *Secondary Cluster: (Live)*
>>> Topic: "primary.table" (contains data from T = 0 to T = n)
>>> Topic: "table" (contains data from T = n+1 to now)
>>>
>>> If I want to materialize state from using Kafka Streams, obviously I am
>>> now in a bit of a pickle since I need to consume "primary.table" before I
>>> consume "table". Have you encountered rebuilding state in Kafka Streams
>>> using Active-Active? For non-Kafka Streams I can see using a single
>>> consumer for "primary.table" and one for "table", interleaving the
>>> timestamps and performing basic event dispatching based on my own tracked
>>> stream-time, but for Kafka Streams I don't think there exists a solution to
>>> this.
>>>
>>> If you have any thoughts on this or some recommendations for Kafka
>>> Streams with Active-Active I would be very appreciative.
>>>
>>> Thanks
>>> Adam
>>>
>>>
>>>


Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Adam Bellemare
Hi Ryanne

I think we have inconsistent definitions of Active-Active. The producer is
only producing to one cluster (primary) and one topic (topic "table"), and
the other cluster (secondary) contains only a replication of the data via
MM2 ("primary.table"). What you seemed to be proposing is that the
producer's "table" data is sent fully to each cluster, such that the state
can be materialized as a KTable in each application running on each
cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
you advocated.

You also state that "As with normal consumers, the Streams app should
*subscribe
to any remote topics*, e.g. with a regex, s.t. the application state will
reflect input from either source cluster.". Wouldn't this mean that the
stateful "table" topic that we wish to materialize would be replicated by
MM2 from Primary, such that we end up with the following:

*Replicated Entity/Stateful Data:*
*Primary Cluster: (Live)*
Topic: "table" (contains data from T = 0 to T = n)
SteamsAppPrimary is consuming from ("table")

*Secondary Cluster: (Live)*
Topic: "primary.table" (contains data from T = 0 to T = n)
SteamsAppSecondary is consuming from ("primary.table")

What does StreamsAppSecondary do when "primary.table" is no longer
replicated because Primary has died? Additionally, where should the
producer of topic "table" now write its data to, assuming that Primary
Cluster is irrevocably lost?

I hope this better outlines my scenario. The trivial solution seems to be
to make your producers produce all stateful data (topic "table") to each
cluster, which makes MM2 unnecessary, but can also lead to data
inconsistencies so it's not exactly foolproof.

Thanks

On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan  wrote:

> Hello Adam, thanks for the questions. Yes my organization uses Streams,
> and yes you can use Streams with MM2/KIP-382, though perhaps not in the way
> you are describing.
>
> The architecture you mention is more "active/standby" than "active/active"
> IMO. The "secondary" cluster is not being used until a failure, at which
> point you migrate your app and expect the data to already be there. This
> works for normal consumers where you can seek() and --reset-offsets.
> Streams apps can be reset with the kafka-streams-application-reset tool,
> but as you point out, that doesn't help with rebuilding an app's internal
> state, which would be missing on the secondary cluster. (Granted, that may
> be okay depending on your particular application.)
>
> A true "active/active" solution IMO would be to run your same Streams app
> in _both_ clusters (primary, secondary), s.t. the entire application state
> is available and continuously updated in both clusters. As with normal
> consumers, the Streams app should subscribe to any remote topics, e.g. with
> a regex, s.t. the application state will reflect input from either source
> cluster.
>
> This is essentially what Streams' "standby replicas" are -- extra copies
> of application state to support quicker failover. Without these replicas,
> Streams would need to start back at offset 0 and re-process everything in
> order to rebuild state (which you don't want to do during a disaster,
> especially!). The same logic applies to using Streams with MM2. You _could_
> failover by resetting the app and rebuilding all the missing state, or you
> could have a copy of everything sitting there ready when you need it. The
> easiest way to do the latter is to run your app in both clusters.
>
> Hope that helps.
>
> Ryanne
>
> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> I have a quick question for you about Active+Active replication and Kafka
>> Streams. First, does your org /do you use Kafka Streams? If not then I
>> think this conversation can end here. ;)
>>
>> Secondly, and for the broader Kafka Dev group - what happens if I want to
>> use Active+Active replication with my Kafka Streams app, say, to
>> materialize a simple KTable? Based on my understanding, I topic "table" on
>> the primary cluster will be replicated to the secondary cluster as
>> "primary.table". In the case of a full cluster failure for primary, the
>> producer to topic "table" on the primary switches over to the secondary
>> cluster, creates its own "table" topic and continues to write to there. So
>> now, assuming we have had no data loss, we end up with:
>>
>>
>> *Primary Cluster: (Dead)*
>>
>>
>> *Secondary Cluster: (Live)*
>> Topic: "primary.table" (contains data from T = 0 to T = n)
>> To

Re: KIP-382 + Kafka Streams Question

2019-07-22 Thread Ryanne Dolan
Hello Adam, thanks for the questions. Yes my organization uses Streams, and
yes you can use Streams with MM2/KIP-382, though perhaps not in the way you
are describing.

The architecture you mention is more "active/standby" than "active/active"
IMO. The "secondary" cluster is not being used until a failure, at which
point you migrate your app and expect the data to already be there. This
works for normal consumers where you can seek() and --reset-offsets.
Streams apps can be reset with the kafka-streams-application-reset tool,
but as you point out, that doesn't help with rebuilding an app's internal
state, which would be missing on the secondary cluster. (Granted, that may
be okay depending on your particular application.)

A true "active/active" solution IMO would be to run your same Streams app
in _both_ clusters (primary, secondary), s.t. the entire application state
is available and continuously updated in both clusters. As with normal
consumers, the Streams app should subscribe to any remote topics, e.g. with
a regex, s.t. the application state will reflect input from either source
cluster.

This is essentially what Streams' "standby replicas" are -- extra copies of
application state to support quicker failover. Without these replicas,
Streams would need to start back at offset 0 and re-process everything in
order to rebuild state (which you don't want to do during a disaster,
especially!). The same logic applies to using Streams with MM2. You _could_
failover by resetting the app and rebuilding all the missing state, or you
could have a copy of everything sitting there ready when you need it. The
easiest way to do the latter is to run your app in both clusters.

Hope that helps.

Ryanne

On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare 
wrote:

> Hi Ryanne
>
> I have a quick question for you about Active+Active replication and Kafka
> Streams. First, does your org /do you use Kafka Streams? If not then I
> think this conversation can end here. ;)
>
> Secondly, and for the broader Kafka Dev group - what happens if I want to
> use Active+Active replication with my Kafka Streams app, say, to
> materialize a simple KTable? Based on my understanding, I topic "table" on
> the primary cluster will be replicated to the secondary cluster as
> "primary.table". In the case of a full cluster failure for primary, the
> producer to topic "table" on the primary switches over to the secondary
> cluster, creates its own "table" topic and continues to write to there. So
> now, assuming we have had no data loss, we end up with:
>
>
> *Primary Cluster: (Dead)*
>
>
> *Secondary Cluster: (Live)*
> Topic: "primary.table" (contains data from T = 0 to T = n)
> Topic: "table" (contains data from T = n+1 to now)
>
> If I want to materialize state from using Kafka Streams, obviously I am
> now in a bit of a pickle since I need to consume "primary.table" before I
> consume "table". Have you encountered rebuilding state in Kafka Streams
> using Active-Active? For non-Kafka Streams I can see using a single
> consumer for "primary.table" and one for "table", interleaving the
> timestamps and performing basic event dispatching based on my own tracked
> stream-time, but for Kafka Streams I don't think there exists a solution to
> this.
>
> If you have any thoughts on this or some recommendations for Kafka Streams
> with Active-Active I would be very appreciative.
>
> Thanks
> Adam
>
>
>


KIP-382 + Kafka Streams Question

2019-07-22 Thread Adam Bellemare
Hi Ryanne

I have a quick question for you about Active+Active replication and Kafka
Streams. First, does your org /do you use Kafka Streams? If not then I
think this conversation can end here. ;)

Secondly, and for the broader Kafka Dev group - what happens if I want to
use Active+Active replication with my Kafka Streams app, say, to
materialize a simple KTable? Based on my understanding, I topic "table" on
the primary cluster will be replicated to the secondary cluster as
"primary.table". In the case of a full cluster failure for primary, the
producer to topic "table" on the primary switches over to the secondary
cluster, creates its own "table" topic and continues to write to there. So
now, assuming we have had no data loss, we end up with:


*Primary Cluster: (Dead)*


*Secondary Cluster: (Live)*
Topic: "primary.table" (contains data from T = 0 to T = n)
Topic: "table" (contains data from T = n+1 to now)

If I want to materialize state from using Kafka Streams, obviously I am now
in a bit of a pickle since I need to consume "primary.table" before I
consume "table". Have you encountered rebuilding state in Kafka Streams
using Active-Active? For non-Kafka Streams I can see using a single
consumer for "primary.table" and one for "table", interleaving the
timestamps and performing basic event dispatching based on my own tracked
stream-time, but for Kafka Streams I don't think there exists a solution to
this.

If you have any thoughts on this or some recommendations for Kafka Streams
with Active-Active I would be very appreciative.

Thanks
Adam


Question about integrating kafka broker with a service

2019-05-16 Thread Zhou, Thomas
Hi,

I am one of the Kafka users and I have a question about how to integrate our 
service with Kafka. Basically, we want to enable Kafka with TLS and we want to 
enable mutual authentication use SSL context. We’ve already got a service which 
will sign the cert and manage the key. Our goal is to let Kafka broker side and 
client side integrate this service so people will not need to worry about 
rotating the key and other stuff. I know that kafka design is un-pluggable, but 
can I get some advice about how difficult it is to make kafka pluggable with a 
service as I mentioned.
I will really appreciate if you could give some advice.


Thanks & Regards,
Thomas


Re: Question on performance data for Kafka vs NATS

2019-03-22 Thread Adam Bellemare
One more thing to note:

You are looking at regular, base NATS. On its own, it is not a direct 1-1
comparison to Kafka because it lacks things like data retention, clustering
and replication. Instead, you would want to compare it to NATS-Streaming, (
https://github.com/nats-io/nats-streaming-server ). You can find a number
of more recent articles and comparisons by a simple web search.

With that being said, this is likely not the best venue for an in-depth
discussion on tradeoffs between the two (especially since I see you're
spanning two very large mailing lists).

Adam




On Fri, Mar 22, 2019 at 1:34 AM Hans Jespersen  wrote:

> Thats a 4.5 year old benchmark and it was run with a single broker node
> and only 1 producer and 1 consumer all running on a single MacBookPro.
> Definitely not the target production environment for Kafka.
>
> -hans
>
> > On Mar 21, 2019, at 11:43 AM, M. Manna  wrote:
> >
> > HI All,
> >
> > https://nats.io/about/
> >
> > this shows a general comparison of sender/receiver throughputs for NATS
> and
> > other messaging system including our favourite Kafka.
> >
> > It appears that Kafka, despite taking the 2nd place, has a very low
> > throughput. My question is, where does Kafka win over NATS? is it the
> > unique partitioning and delivery semantics? Or, is it something else.
> >
> > From what I can see, NATS has traditional pub/sub and queuing. But it
> > doesn't look like there is any proper retention system built for this.
> >
> > Has anyone come across this already?
> >
> > Thanks,
>


Re: Question on performance data for Kafka vs NATS

2019-03-21 Thread Hans Jespersen
Thats a 4.5 year old benchmark and it was run with a single broker node and 
only 1 producer and 1 consumer all running on a single MacBookPro. Definitely 
not the target production environment for Kafka. 

-hans

> On Mar 21, 2019, at 11:43 AM, M. Manna  wrote:
> 
> HI All,
> 
> https://nats.io/about/
> 
> this shows a general comparison of sender/receiver throughputs for NATS and
> other messaging system including our favourite Kafka.
> 
> It appears that Kafka, despite taking the 2nd place, has a very low
> throughput. My question is, where does Kafka win over NATS? is it the
> unique partitioning and delivery semantics? Or, is it something else.
> 
> From what I can see, NATS has traditional pub/sub and queuing. But it
> doesn't look like there is any proper retention system built for this.
> 
> Has anyone come across this already?
> 
> Thanks,


Question on performance data for Kafka vs NATS

2019-03-21 Thread M. Manna
HI All,

https://nats.io/about/

this shows a general comparison of sender/receiver throughputs for NATS and
other messaging system including our favourite Kafka.

It appears that Kafka, despite taking the 2nd place, has a very low
throughput. My question is, where does Kafka win over NATS? is it the
unique partitioning and delivery semantics? Or, is it something else.

>From what I can see, NATS has traditional pub/sub and queuing. But it
doesn't look like there is any proper retention system built for this.

Has anyone come across this already?

Thanks,


Kafka Topic Volume and (possibly ACL) question

2019-02-18 Thread M. Manna
Hello,

We have a requirement where, based on business requirementes, we need to
publish data only for a specific set of clients. For example, an invoice
update shouldn't go to all clients, only the specific client. But a company
remittance info should be published to all clients. Also, in some cases, a
specific client changes some contract info which is published in a P2P
fashion. We have about 8k clients.

What is the ideal way to control this flow?

1) specific topic per client
2) Some form of ACL?

For option 1, we are not 100% sure if Kafka can handle 8k topics (or, the
resource issues for that matter). Has anyone solved a similar business
problem? If so, would you mind sharing your solution?

Btw, we are not using stream platform, it's simply pub-sub. Because we
don't need real-time aggregation of various items. For us, it's key that
the synchronisation occurs, and has "exactly-once" semantics.

Thanks,


Re: [Discuss] Question on KIP-298: Error Handling in Kafka Connect

2019-02-06 Thread Randall Hauch
Hi, Pere.

The primary reason that KIP-298 did not support a DLQ for source connectors
was because we couldn't get around serialization problems. With source
connectors, the converter (serializer) is the last element in the chain,
and if there is a problem serializing a record then we could not work out
how to serialize the unserializeable record so it could be written to the
DLQ? Another problem was the inability to write into Kafka, at which point
it's probably likely we cannot write to the DLQ topic, either.

Randall

On Tue, Jan 1, 2019 at 2:18 PM Pere Urbón Bayes 
wrote:

> Hi,
>  a quick question on the KIP-298 Dead letter queue, as I read from the KIP
> is only available for the Sink connectors.
>
> While I know the challenges of defining a dead-letter queue for the
> incoming messages, I wanted ask/discuss what is the sense in here for this,
> do you completely discard the option?
>
> I sort of see it useful for messages that where pulled from the source, but
> somehow could not be ingested in Kafka, might be because of serialisation
> for example.
>
> What do you think?
>
> --
> Pere Urbon-Bayes
> Software Architect
> http://www.purbon.com
> https://twitter.com/purbon
> https://www.linkedin.com/in/purbon/
>


[Discuss] Question on KIP-298: Error Handling in Kafka Connect

2019-01-01 Thread Pere Urbón Bayes
Hi,
 a quick question on the KIP-298 Dead letter queue, as I read from the KIP
is only available for the Sink connectors.

While I know the challenges of defining a dead-letter queue for the
incoming messages, I wanted ask/discuss what is the sense in here for this,
do you completely discard the option?

I sort of see it useful for messages that where pulled from the source, but
somehow could not be ingested in Kafka, might be because of serialisation
for example.

What do you think?

-- 
Pere Urbon-Bayes
Software Architect
http://www.purbon.com
https://twitter.com/purbon
https://www.linkedin.com/in/purbon/


Re: A question about kafka streams API

2018-09-18 Thread John Roesler
Hey Yui,

Sorry, I haven't had a chance to respond. I've got a pretty busy couple of
weeks coming up, so I don't know when I'll look at this, but I find this
puzzling. I'll save your email and try what you said to see if I can figure
it out. Thanks for the repro code.

Let me know if you figure it out. Also, if you think you've found a bug,
feel free to file a jira ticket as well. It might get broader visibility
that way.

Thanks,
-John

On Thu, Sep 13, 2018 at 1:57 AM Yui Yoi  wrote:

> Hi Adam and John, thank you for your effort!
> We are implementing full idem-potency in our projects so that's nothing to
> worry about.
> As to what John said - we only have one partition, I personally assured
> that.
> So as i wrote in section 2. of my first message in this conversation - my
> stream should have processed the "asd" message again because it is not
> committed yet.
> That's why i suspect it has something to do with the stream's cache; maybe
> something like:
> 1. "asd" got processed and restored in cache
> 2. "{}" got processed and cached too.
> 3. commit interval makes the stream commit the offset of "{}"
>
>
> B.t.w
> If you want to run my application you should:
> 1. open it in some java editor as maven project
> 2. run it as a normal java application
> 3. setup kafka server & zookeeper on localhost
> 4. then you can send the above messages via cli
>
> John - even if you send "asd1", "asd2", "asd3" you will see in the logs
> that my app takes the latest each time
>
> Of course that's far beyond what i can ask from you guys to do, thanks a
> lot for your help.
>
> On Wed, Sep 12, 2018 at 8:14 PM John Roesler  wrote:
>
> > Hi!
> >
> > As Adam said, if you throw an exception during processing, it should
> cause
> > Streams to shut itself down and *not* commit that message. Therefore,
> when
> > you start up again, it should again attempt to process that same message
> > (and shut down again).
> >
> > Within a single partition, messages are processed in order, so a bad
> > message will block the queue, and you should not see subsequent messages
> > get processed.
> >
> > However, if your later message "{}" goes to a different partition than
> the
> > bad message, then there's no relationship between them, and the later,
> > good, message might get processed.
> >
> > Does that help?
> > -John
> >
> > On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare  >
> > wrote:
> >
> > > Hi Yui Yoi
> > >
> > >
> > > Keep in mind that Kafka Consumers don't traditionally request only a
> > single
> > > message at a time, but instead requests them in batches. This allows
> for
> > > much higher throughput, but does result in the scenario of
> > "at-least-once"
> > > processing. Generally what will happen in this scenario is the
> following:
> > >
> > > 1) Client requests the next set of messages from offset (t). For
> example,
> > > assume it gets 10 messages and message 6 is "bad".
> > > 2) The client's processor will then process the messages one at a time.
> > > Note that the offsets are not committed after the message is processed,
> > but
> > > only at the end of the batch.
> > > 3) The bad message it hit by the processor. At this point you can
> decide
> > to
> > > skip the message, throw an exception, etc.
> > > 4a) If you decide to skip the message, processing will continue. Once
> all
> > > 10 messages are processed, the new offset (t+10) offset is committed
> back
> > > to Kafka.
> > > 4b) If you decide to throw an exception and terminate your app, you
> will
> > > have still processed the messages that came before the bad message.
> > Because
> > > the offset (t+10) is not committed, the next time you start the app it
> > will
> > > consume from offset t, and those messages will be processed again. This
> > is
> > > "at-least-once" processing.
> > >
> > >
> > > Now, if you need exactly-once processing, you have two choices -
> > > 1) Use Kafka Streams with exactly-once semantics (though, as I am not
> > > familiar with your framework, it may support it as well).
> > > 2) Use idempotent practices (ie: it doesn't matter if the same messages
> > get
> > > processed more than once).
> > >
> > >
> > > Hope this helps -
> > >
> > > Adam
> > >
> > >
> > > On Wed, Sep 12, 2018

Re: A question about kafka streams API

2018-09-13 Thread Yui Yoi
Hi Adam and John, thank you for your effort!
We are implementing full idem-potency in our projects so that's nothing to
worry about.
As to what John said - we only have one partition, I personally assured
that.
So as i wrote in section 2. of my first message in this conversation - my
stream should have processed the "asd" message again because it is not
committed yet.
That's why i suspect it has something to do with the stream's cache; maybe
something like:
1. "asd" got processed and restored in cache
2. "{}" got processed and cached too.
3. commit interval makes the stream commit the offset of "{}"


B.t.w
If you want to run my application you should:
1. open it in some java editor as maven project
2. run it as a normal java application
3. setup kafka server & zookeeper on localhost
4. then you can send the above messages via cli

John - even if you send "asd1", "asd2", "asd3" you will see in the logs
that my app takes the latest each time

Of course that's far beyond what i can ask from you guys to do, thanks a
lot for your help.

On Wed, Sep 12, 2018 at 8:14 PM John Roesler  wrote:

> Hi!
>
> As Adam said, if you throw an exception during processing, it should cause
> Streams to shut itself down and *not* commit that message. Therefore, when
> you start up again, it should again attempt to process that same message
> (and shut down again).
>
> Within a single partition, messages are processed in order, so a bad
> message will block the queue, and you should not see subsequent messages
> get processed.
>
> However, if your later message "{}" goes to a different partition than the
> bad message, then there's no relationship between them, and the later,
> good, message might get processed.
>
> Does that help?
> -John
>
> On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare 
> wrote:
>
> > Hi Yui Yoi
> >
> >
> > Keep in mind that Kafka Consumers don't traditionally request only a
> single
> > message at a time, but instead requests them in batches. This allows for
> > much higher throughput, but does result in the scenario of
> "at-least-once"
> > processing. Generally what will happen in this scenario is the following:
> >
> > 1) Client requests the next set of messages from offset (t). For example,
> > assume it gets 10 messages and message 6 is "bad".
> > 2) The client's processor will then process the messages one at a time.
> > Note that the offsets are not committed after the message is processed,
> but
> > only at the end of the batch.
> > 3) The bad message it hit by the processor. At this point you can decide
> to
> > skip the message, throw an exception, etc.
> > 4a) If you decide to skip the message, processing will continue. Once all
> > 10 messages are processed, the new offset (t+10) offset is committed back
> > to Kafka.
> > 4b) If you decide to throw an exception and terminate your app, you will
> > have still processed the messages that came before the bad message.
> Because
> > the offset (t+10) is not committed, the next time you start the app it
> will
> > consume from offset t, and those messages will be processed again. This
> is
> > "at-least-once" processing.
> >
> >
> > Now, if you need exactly-once processing, you have two choices -
> > 1) Use Kafka Streams with exactly-once semantics (though, as I am not
> > familiar with your framework, it may support it as well).
> > 2) Use idempotent practices (ie: it doesn't matter if the same messages
> get
> > processed more than once).
> >
> >
> > Hope this helps -
> >
> > Adam
> >
> >
> > On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi  wrote:
> >
> > > Hi Adam,
> > > Thanks a lot for the rapid response, it did helped!
> > >
> > > Let me though ask one more simple question: Can I make a stream
> > application
> > > stuck on an invalid message? and not consuming any further messages?
> > >
> > > Thanks again
> > >
> > > On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <
> adam.bellem...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi Yui Yoi
> > > >
> > > > Preface: I am not familiar with the spring framework.
> > > >
> > > > "Earliest" when it comes to consuming from Kafka means, "Start
> reading
> > > from
> > > > the first message in the topic, *if there is no offset stored for
> that
> > > > consumer group*". It sounds like you are expecting it to re-read each
> > > > message 

Re: A question about kafka streams API

2018-09-12 Thread John Roesler
Hi!

As Adam said, if you throw an exception during processing, it should cause
Streams to shut itself down and *not* commit that message. Therefore, when
you start up again, it should again attempt to process that same message
(and shut down again).

Within a single partition, messages are processed in order, so a bad
message will block the queue, and you should not see subsequent messages
get processed.

However, if your later message "{}" goes to a different partition than the
bad message, then there's no relationship between them, and the later,
good, message might get processed.

Does that help?
-John

On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare 
wrote:

> Hi Yui Yoi
>
>
> Keep in mind that Kafka Consumers don't traditionally request only a single
> message at a time, but instead requests them in batches. This allows for
> much higher throughput, but does result in the scenario of "at-least-once"
> processing. Generally what will happen in this scenario is the following:
>
> 1) Client requests the next set of messages from offset (t). For example,
> assume it gets 10 messages and message 6 is "bad".
> 2) The client's processor will then process the messages one at a time.
> Note that the offsets are not committed after the message is processed, but
> only at the end of the batch.
> 3) The bad message it hit by the processor. At this point you can decide to
> skip the message, throw an exception, etc.
> 4a) If you decide to skip the message, processing will continue. Once all
> 10 messages are processed, the new offset (t+10) offset is committed back
> to Kafka.
> 4b) If you decide to throw an exception and terminate your app, you will
> have still processed the messages that came before the bad message. Because
> the offset (t+10) is not committed, the next time you start the app it will
> consume from offset t, and those messages will be processed again. This is
> "at-least-once" processing.
>
>
> Now, if you need exactly-once processing, you have two choices -
> 1) Use Kafka Streams with exactly-once semantics (though, as I am not
> familiar with your framework, it may support it as well).
> 2) Use idempotent practices (ie: it doesn't matter if the same messages get
> processed more than once).
>
>
> Hope this helps -
>
> Adam
>
>
> On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi  wrote:
>
> > Hi Adam,
> > Thanks a lot for the rapid response, it did helped!
> >
> > Let me though ask one more simple question: Can I make a stream
> application
> > stuck on an invalid message? and not consuming any further messages?
> >
> > Thanks again
> >
> > On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare  >
> > wrote:
> >
> > > Hi Yui Yoi
> > >
> > > Preface: I am not familiar with the spring framework.
> > >
> > > "Earliest" when it comes to consuming from Kafka means, "Start reading
> > from
> > > the first message in the topic, *if there is no offset stored for that
> > > consumer group*". It sounds like you are expecting it to re-read each
> > > message whenever a new message comes in. This is not going to happen,
> as
> > > there will be a committed offset and "earliest" will no longer be used.
> > If
> > > you were to use "latest" instead, if a consumer is started that does
> not
> > > have a valid offset, it would use the very latest message in the topic
> as
> > > the starting offset for message consumption.
> > >
> > > Now, if you are using the same consumer group each time you run the
> > > application (which it seems is true, as you have "test-group" hardwired
> > in
> > > your application.yml), but you do not tear down your local cluster and
> > > clear out its state, you will indeed see the behaviour you describe.
> > > Remember that Kafka is durable, and maintains the offsets when the
> > > individual applications go away. So you are probably seeing this:
> > >
> > > 1) start application instance 1. It realizes it has no offset when it
> > tries
> > > to register as a consumer on the input topic, so it creates a new
> > consumer
> > > entry for "earliest" for your consumer group.
> > > 2) send message "asd"
> > > 3) application instance 1 receives "asd", processes it, and updates the
> > > offset (offset head = 1)
> > > 4) Terminate instance 1
> > > 5) Start application instance 2. It detects correctly that consumer
> group
> > > "test-group" is available and reads that offset as its starting po

Re: A question about kafka streams API

2018-09-12 Thread Adam Bellemare
Hi Yui Yoi


Keep in mind that Kafka Consumers don't traditionally request only a single
message at a time, but instead requests them in batches. This allows for
much higher throughput, but does result in the scenario of "at-least-once"
processing. Generally what will happen in this scenario is the following:

1) Client requests the next set of messages from offset (t). For example,
assume it gets 10 messages and message 6 is "bad".
2) The client's processor will then process the messages one at a time.
Note that the offsets are not committed after the message is processed, but
only at the end of the batch.
3) The bad message it hit by the processor. At this point you can decide to
skip the message, throw an exception, etc.
4a) If you decide to skip the message, processing will continue. Once all
10 messages are processed, the new offset (t+10) offset is committed back
to Kafka.
4b) If you decide to throw an exception and terminate your app, you will
have still processed the messages that came before the bad message. Because
the offset (t+10) is not committed, the next time you start the app it will
consume from offset t, and those messages will be processed again. This is
"at-least-once" processing.


Now, if you need exactly-once processing, you have two choices -
1) Use Kafka Streams with exactly-once semantics (though, as I am not
familiar with your framework, it may support it as well).
2) Use idempotent practices (ie: it doesn't matter if the same messages get
processed more than once).


Hope this helps -

Adam


On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi  wrote:

> Hi Adam,
> Thanks a lot for the rapid response, it did helped!
>
> Let me though ask one more simple question: Can I make a stream application
> stuck on an invalid message? and not consuming any further messages?
>
> Thanks again
>
> On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare 
> wrote:
>
> > Hi Yui Yoi
> >
> > Preface: I am not familiar with the spring framework.
> >
> > "Earliest" when it comes to consuming from Kafka means, "Start reading
> from
> > the first message in the topic, *if there is no offset stored for that
> > consumer group*". It sounds like you are expecting it to re-read each
> > message whenever a new message comes in. This is not going to happen, as
> > there will be a committed offset and "earliest" will no longer be used.
> If
> > you were to use "latest" instead, if a consumer is started that does not
> > have a valid offset, it would use the very latest message in the topic as
> > the starting offset for message consumption.
> >
> > Now, if you are using the same consumer group each time you run the
> > application (which it seems is true, as you have "test-group" hardwired
> in
> > your application.yml), but you do not tear down your local cluster and
> > clear out its state, you will indeed see the behaviour you describe.
> > Remember that Kafka is durable, and maintains the offsets when the
> > individual applications go away. So you are probably seeing this:
> >
> > 1) start application instance 1. It realizes it has no offset when it
> tries
> > to register as a consumer on the input topic, so it creates a new
> consumer
> > entry for "earliest" for your consumer group.
> > 2) send message "asd"
> > 3) application instance 1 receives "asd", processes it, and updates the
> > offset (offset head = 1)
> > 4) Terminate instance 1
> > 5) Start application instance 2. It detects correctly that consumer group
> > "test-group" is available and reads that offset as its starting point.
> > 6) send message "{}"
> > 7) application instance 2 receives "{}", processes it, and updates the
> > offset (offset head = 2)
> > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> > telling the Kafka cluster that it belongs to the same consumer group as
> > application 1.
> >
> > Hope this helps,
> >
> > Adam
> >
> >
> >
> >
> >
> > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi  wrote:
> >
> > > TL;DR:
> > > my streams application skips uncommitted messages
> > >
> > > Hello,
> > > I'm using streams API via spring framework and experiencing a weird
> > > behavior which I would like to get an explanation to:
> > > First of all: The attached zip is my test project, I used kafka cli to
> > run
> > > a localhost broker and zookeeper
> > >
> > > what is happening is as follows:
> > > 1. I send an invalid message, such as "asd", and m

Re: A question about kafka streams API

2018-09-12 Thread Yui Yoi
Hi Adam,
Thanks a lot for the rapid response, it did helped!

Let me though ask one more simple question: Can I make a stream application
stuck on an invalid message? and not consuming any further messages?

Thanks again

On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare 
wrote:

> Hi Yui Yoi
>
> Preface: I am not familiar with the spring framework.
>
> "Earliest" when it comes to consuming from Kafka means, "Start reading from
> the first message in the topic, *if there is no offset stored for that
> consumer group*". It sounds like you are expecting it to re-read each
> message whenever a new message comes in. This is not going to happen, as
> there will be a committed offset and "earliest" will no longer be used. If
> you were to use "latest" instead, if a consumer is started that does not
> have a valid offset, it would use the very latest message in the topic as
> the starting offset for message consumption.
>
> Now, if you are using the same consumer group each time you run the
> application (which it seems is true, as you have "test-group" hardwired in
> your application.yml), but you do not tear down your local cluster and
> clear out its state, you will indeed see the behaviour you describe.
> Remember that Kafka is durable, and maintains the offsets when the
> individual applications go away. So you are probably seeing this:
>
> 1) start application instance 1. It realizes it has no offset when it tries
> to register as a consumer on the input topic, so it creates a new consumer
> entry for "earliest" for your consumer group.
> 2) send message "asd"
> 3) application instance 1 receives "asd", processes it, and updates the
> offset (offset head = 1)
> 4) Terminate instance 1
> 5) Start application instance 2. It detects correctly that consumer group
> "test-group" is available and reads that offset as its starting point.
> 6) send message "{}"
> 7) application instance 2 receives "{}", processes it, and updates the
> offset (offset head = 2)
> *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> telling the Kafka cluster that it belongs to the same consumer group as
> application 1.
>
> Hope this helps,
>
> Adam
>
>
>
>
>
> On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi  wrote:
>
> > TL;DR:
> > my streams application skips uncommitted messages
> >
> > Hello,
> > I'm using streams API via spring framework and experiencing a weird
> > behavior which I would like to get an explanation to:
> > First of all: The attached zip is my test project, I used kafka cli to
> run
> > a localhost broker and zookeeper
> >
> > what is happening is as follows:
> > 1. I send an invalid message, such as "asd", and my consumer has a lag
> and
> > error message as expected
> > 2. I send a valid message such as "{}", but instead of rereading the
> first
> > message as expected from an "earliest" configured application - my
> > application reads the latest message, commits it and ignoring the one in
> > error, thus i have no lag!
> > 3. When I'm running my application when there are uncommitted messages -
> > my application reads the FIRST not committed message, as if it IS an
> > "earliest" configured application!
> >
> > In your documentation you assure "at least once" behavior, but according
> > to section 2. it happens so my application does not receive those
> messages
> > not even once (as i said, those messages are uncommitted)
> >
> > My guess is that it has something to do with the stream's cache... I
> would
> > very like to have an explanation or even a solution
> >
> > I'm turning to you as a last resort, after long weeks of research and
> > experiments
> >
> > Thanks alot
> >
>


Re: A question about kafka streams API

2018-09-12 Thread Adam Bellemare
Hi Yui Yoi

Preface: I am not familiar with the spring framework.

"Earliest" when it comes to consuming from Kafka means, "Start reading from
the first message in the topic, *if there is no offset stored for that
consumer group*". It sounds like you are expecting it to re-read each
message whenever a new message comes in. This is not going to happen, as
there will be a committed offset and "earliest" will no longer be used. If
you were to use "latest" instead, if a consumer is started that does not
have a valid offset, it would use the very latest message in the topic as
the starting offset for message consumption.

Now, if you are using the same consumer group each time you run the
application (which it seems is true, as you have "test-group" hardwired in
your application.yml), but you do not tear down your local cluster and
clear out its state, you will indeed see the behaviour you describe.
Remember that Kafka is durable, and maintains the offsets when the
individual applications go away. So you are probably seeing this:

1) start application instance 1. It realizes it has no offset when it tries
to register as a consumer on the input topic, so it creates a new consumer
entry for "earliest" for your consumer group.
2) send message "asd"
3) application instance 1 receives "asd", processes it, and updates the
offset (offset head = 1)
4) Terminate instance 1
5) Start application instance 2. It detects correctly that consumer group
"test-group" is available and reads that offset as its starting point.
6) send message "{}"
7) application instance 2 receives "{}", processes it, and updates the
offset (offset head = 2)
*NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
telling the Kafka cluster that it belongs to the same consumer group as
application 1.

Hope this helps,

Adam





On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi  wrote:

> TL;DR:
> my streams application skips uncommitted messages
>
> Hello,
> I'm using streams API via spring framework and experiencing a weird
> behavior which I would like to get an explanation to:
> First of all: The attached zip is my test project, I used kafka cli to run
> a localhost broker and zookeeper
>
> what is happening is as follows:
> 1. I send an invalid message, such as "asd", and my consumer has a lag and
> error message as expected
> 2. I send a valid message such as "{}", but instead of rereading the first
> message as expected from an "earliest" configured application - my
> application reads the latest message, commits it and ignoring the one in
> error, thus i have no lag!
> 3. When I'm running my application when there are uncommitted messages -
> my application reads the FIRST not committed message, as if it IS an
> "earliest" configured application!
>
> In your documentation you assure "at least once" behavior, but according
> to section 2. it happens so my application does not receive those messages
> not even once (as i said, those messages are uncommitted)
>
> My guess is that it has something to do with the stream's cache... I would
> very like to have an explanation or even a solution
>
> I'm turning to you as a last resort, after long weeks of research and
> experiments
>
> Thanks alot
>


A question about kafka streams API

2018-09-12 Thread Yui Yoi
TL;DR:
my streams application skips uncommitted messages

Hello,
I'm using streams API via spring framework and experiencing a weird
behavior which I would like to get an explanation to:
First of all: The attached zip is my test project, I used kafka cli to run
a localhost broker and zookeeper

what is happening is as follows:
1. I send an invalid message, such as "asd", and my consumer has a lag and
error message as expected
2. I send a valid message such as "{}", but instead of rereading the first
message as expected from an "earliest" configured application - my
application reads the latest message, commits it and ignoring the one in
error, thus i have no lag!
3. When I'm running my application when there are uncommitted messages - my
application reads the FIRST not committed message, as if it IS an
"earliest" configured application!

In your documentation you assure "at least once" behavior, but according to
section 2. it happens so my application does not receive those messages not
even once (as i said, those messages are uncommitted)

My guess is that it has something to do with the stream's cache... I would
very like to have an explanation or even a solution

I'm turning to you as a last resort, after long weeks of research and
experiments

Thanks alot
<>


Re: Question about connector rebalancing

2018-09-05 Thread Gwen Shapira
Can you use stand-alone mode in that case?

On Wed, Sep 5, 2018 at 7:12 PM, Chen He  wrote:

> Hi Kafka experts
>
> I have a question about connector rebalancing issue. Why don't we make it
> option, I mean have a parameter that turn on/off it instead of having it as
> a must?
>
> We can have a parameter like: "connector.rebalancing.enable" parameter and
> make it as "true" by default. It allows users to turn it off if they want.
>
> There are some cases that connector rebalancing is not needed.
>
> Regards!
>
> Chen
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter <https://twitter.com/ConfluentInc> | blog
<http://www.confluent.io/blog>


Question about connector rebalancing

2018-09-05 Thread Chen He
Hi Kafka experts

I have a question about connector rebalancing issue. Why don't we make it
option, I mean have a parameter that turn on/off it instead of having it as
a must?

We can have a parameter like: "connector.rebalancing.enable" parameter and
make it as "true" by default. It allows users to turn it off if they want.

There are some cases that connector rebalancing is not needed.

Regards!

Chen


Re: Question Regarding Apache Kafka replication

2018-08-14 Thread Jason Gustafson
Hi Shibha,

I have a KIP in the works which proposes to do this. Stay tuned.

-Jason

On Tue, Aug 14, 2018 at 4:43 PM, Shibha Malik  wrote:

> Hi Team,
>
> I wanted to know as to why Kafka does not support read by consumers from a
> replica of a partition which is not a leader of the partition ( but one of
> the partition on ISR ) ?
>
> Thanks,
> Shibha Malik
>


Question Regarding Apache Kafka replication

2018-08-14 Thread Shibha Malik
Hi Team,

I wanted to know as to why Kafka does not support read by consumers from a
replica of a partition which is not a leader of the partition ( but one of
the partition on ISR ) ?

Thanks,
Shibha Malik


Re: Processor API StateStore and Recovery with State Machines question.

2018-07-26 Thread Adam Bellemare
Thanks Guozhang, I appreciate the explanation. That cleared up a lot for me
and confirmed what I thought.

Based on the above, in my scenario, the state machine could get into an
incorrect state. (Just for whoever may be watching this thread).

On Tue, Jul 24, 2018 at 6:20 PM, Guozhang Wang  wrote:

> Hello Adam,
>
> I figured that rather than answering your questions one-by-one, I'd give
> you a more general explanation between consumer offset commits, changelog
> and state store.
>
> If you have a state store update processor, the state store maintenance
> workflow is this:
>
>
> 1) updating the state store:
>
> 1.a write to state store.
> 1.b write to changelog topic
>
>
> Note that 1.a) could be async: the state store may be caching enabled, and
> also even the state store itself may have some write buffer (e.g. rocksDB);
> also 1.b) is async and batching enabled as well, and the actual sending
> request is done via another thread.
>
> So at the end of 1.b) either is possible: data is written persistently to
> the local files of the state store, but have not been sent to changelog, or
> data not written persistently to local files yet, but have been sent to
> changelog, or both have happened, or neither has happened.
>
>
> 2) committing the state store:
>
> 2.a) flush state store (make sure all previous writes have been persisted)
> 2.b) flush on producer (make sure all previous writes to changelog topics
> have been acknowledged).
> 2.c) commit offset.
>
> That is, if committing succeeded, by the end of 2.c) all should be done,
> and everything is consistent.
>
> Now if there is a crash after 1.b) and before 2), then like above said, any
> scenarios may happen, but note that consumer's offset will definitely NOT
> committed yet (it should only be done in 2.c) ), so upon restarting the
> data will be re-processed, and hence either state store's image or
> changelog may contained duplicated results, aka "at-least-once".
>
> 3) Finally, when exactly-once is enabled, if there is any crashes, the
> changelog topic / state store will be "rewinded" (I omit the implementation
> details here, but just assume that logically, we can rewind them) to the
> previously successful commit, so `exactly-once` is guaranteed.
>
>
> Guozhang
>
> On Sun, Jul 22, 2018 at 5:29 PM, Adam Bellemare 
> wrote:
>
> > Hi Folks
> >
> > I have a quick question about a scenario that I would appreciate some
> > insight on. This is related to a KIP I am working on, but I wanted to
> break
> > this out into its own scenario to reach a wider audience. In this
> scenario,
> > I am using builder.internalTopologyBuilder to create the following
> within
> > the internals of Kafka Streaming:
> >
> > 1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...)
> )
> >
> > 2) ProcessorSupplier with StateStore, Changelogging enabled. For the
> > purpose of this question, this processor is a very simple state machine.
> > All it does is alternately block each other event, of a given key, from
> > processing. For instance:
> > (A,1)
> > (A,2)
> > (A,3)
> > It would block the propagation of (A,2). The state of the system after
> > processing each event is:
> > blockNext = true
> > blockNext = false
> > blockNext = true
> >
> > The expecation is that this component would always block the same event,
> in
> > any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but
> not
> > (A,1) or (A,3) ). In other words, it would maintain perfect state in
> > accordance with the offsets of the upstream and downstream elements.
> >
> > 3) The third component is a KTable with a Materialized StateStore where I
> > want to sink the remaining events. It is also backed by a change log. The
> > events arriving would be:
> > (A,1)
> > (A,3)
> >
> > The components are ordered as:
> > 1 -> 2 -> 3
> >
> >
> > Note that I am keeping the state machine in a separate state store. My
> main
> > questions are:
> >
> > 1) Will this workflow be consistent in all manners of failure? For
> example,
> > are the state stores change logs fully written to internal topics before
> > the offset is updated for the consumer in #1?
> >
> > 2) Is it possible that one State Store with changelogging will be logged
> to
> > Kafka safely (say component #3) but the other (#2) will not be, prior to
> a
> > sudden, hard termination of the node?
> >
> > 3) Is the alternate possible, where #2 is backed up to its Kafka Topic
> but
> > #3 is not? Does the

Re: Processor API StateStore and Recovery with State Machines question.

2018-07-24 Thread Guozhang Wang
Hello Adam,

I figured that rather than answering your questions one-by-one, I'd give
you a more general explanation between consumer offset commits, changelog
and state store.

If you have a state store update processor, the state store maintenance
workflow is this:


1) updating the state store:

1.a write to state store.
1.b write to changelog topic


Note that 1.a) could be async: the state store may be caching enabled, and
also even the state store itself may have some write buffer (e.g. rocksDB);
also 1.b) is async and batching enabled as well, and the actual sending
request is done via another thread.

So at the end of 1.b) either is possible: data is written persistently to
the local files of the state store, but have not been sent to changelog, or
data not written persistently to local files yet, but have been sent to
changelog, or both have happened, or neither has happened.


2) committing the state store:

2.a) flush state store (make sure all previous writes have been persisted)
2.b) flush on producer (make sure all previous writes to changelog topics
have been acknowledged).
2.c) commit offset.

That is, if committing succeeded, by the end of 2.c) all should be done,
and everything is consistent.

Now if there is a crash after 1.b) and before 2), then like above said, any
scenarios may happen, but note that consumer's offset will definitely NOT
committed yet (it should only be done in 2.c) ), so upon restarting the
data will be re-processed, and hence either state store's image or
changelog may contained duplicated results, aka "at-least-once".

3) Finally, when exactly-once is enabled, if there is any crashes, the
changelog topic / state store will be "rewinded" (I omit the implementation
details here, but just assume that logically, we can rewind them) to the
previously successful commit, so `exactly-once` is guaranteed.


Guozhang

On Sun, Jul 22, 2018 at 5:29 PM, Adam Bellemare 
wrote:

> Hi Folks
>
> I have a quick question about a scenario that I would appreciate some
> insight on. This is related to a KIP I am working on, but I wanted to break
> this out into its own scenario to reach a wider audience. In this scenario,
> I am using builder.internalTopologyBuilder to create the following within
> the internals of Kafka Streaming:
>
> 1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...) )
>
> 2) ProcessorSupplier with StateStore, Changelogging enabled. For the
> purpose of this question, this processor is a very simple state machine.
> All it does is alternately block each other event, of a given key, from
> processing. For instance:
> (A,1)
> (A,2)
> (A,3)
> It would block the propagation of (A,2). The state of the system after
> processing each event is:
> blockNext = true
> blockNext = false
> blockNext = true
>
> The expecation is that this component would always block the same event, in
> any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but not
> (A,1) or (A,3) ). In other words, it would maintain perfect state in
> accordance with the offsets of the upstream and downstream elements.
>
> 3) The third component is a KTable with a Materialized StateStore where I
> want to sink the remaining events. It is also backed by a change log. The
> events arriving would be:
> (A,1)
> (A,3)
>
> The components are ordered as:
> 1 -> 2 -> 3
>
>
> Note that I am keeping the state machine in a separate state store. My main
> questions are:
>
> 1) Will this workflow be consistent in all manners of failure? For example,
> are the state stores change logs fully written to internal topics before
> the offset is updated for the consumer in #1?
>
> 2) Is it possible that one State Store with changelogging will be logged to
> Kafka safely (say component #3) but the other (#2) will not be, prior to a
> sudden, hard termination of the node?
>
> 3) Is the alternate possible, where #2 is backed up to its Kafka Topic but
> #3 is not? Does the ordering of the topology matter in this case?
>
> 4) Is it possible that the state store #2 is updated and logged, but the
> source topic (#1) offset is not updated?
>
> In all of these cases, my main concern is keeping the state and the
> expected output consistent. For any failure mode, will I be able to recover
> to a fully consistent state given the requirements of the state machine in
> #2?
>
> Though this is a trivial example, I am not certain about the dynamics
> between maintaining state, recovering from internal changelog topics, and
> the order in which all of these things apply. Any words of wisdom or
> explanations would be helpful here. I have been looking through the code
> but I wanted to get second opinions on this.
>
>
>
> Thanks,
>
> Adam
>



-- 
-- Guozhang


Re: Question about issues of Kafka release version 1.1.1

2018-07-23 Thread Ismael Juma
Seems like you're right Lambdaliu. Rajini/Jason, can you please check and
update the JIRAs?

Ismael

On Mon, Jul 23, 2018 at 7:09 AM lambdaliu(刘少波) 
wrote:

> Hi team,
>
> I Have downloaded the source release of kafka version 1.1.1 and found the
> JIRA
> issues KAFKA-6911 and KAFKA-6809 listed in the release notes but it's PR
> looks
> like doesn't contain in the source release. Is this a valid situation?
> Should we
> create a JIRA issue to trace it?
>
> Regards,
> Lambdaliu(Shaobo Liu)
> 
>


Question about issues of Kafka release version 1.1.1

2018-07-23 Thread 刘少波
Hi team,

I Have downloaded the source release of kafka version 1.1.1 and found the JIRA
issues KAFKA-6911 and KAFKA-6809 listed in the release notes but it's PR looks
like doesn't contain in the source release. Is this a valid situation?  Should 
we
create a JIRA issue to trace it?

Regards,
Lambdaliu(Shaobo Liu)



Processor API StateStore and Recovery with State Machines question.

2018-07-22 Thread Adam Bellemare
Hi Folks

I have a quick question about a scenario that I would appreciate some
insight on. This is related to a KIP I am working on, but I wanted to break
this out into its own scenario to reach a wider audience. In this scenario,
I am using builder.internalTopologyBuilder to create the following within
the internals of Kafka Streaming:

1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...) )

2) ProcessorSupplier with StateStore, Changelogging enabled. For the
purpose of this question, this processor is a very simple state machine.
All it does is alternately block each other event, of a given key, from
processing. For instance:
(A,1)
(A,2)
(A,3)
It would block the propagation of (A,2). The state of the system after
processing each event is:
blockNext = true
blockNext = false
blockNext = true

The expecation is that this component would always block the same event, in
any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but not
(A,1) or (A,3) ). In other words, it would maintain perfect state in
accordance with the offsets of the upstream and downstream elements.

3) The third component is a KTable with a Materialized StateStore where I
want to sink the remaining events. It is also backed by a change log. The
events arriving would be:
(A,1)
(A,3)

The components are ordered as:
1 -> 2 -> 3


Note that I am keeping the state machine in a separate state store. My main
questions are:

1) Will this workflow be consistent in all manners of failure? For example,
are the state stores change logs fully written to internal topics before
the offset is updated for the consumer in #1?

2) Is it possible that one State Store with changelogging will be logged to
Kafka safely (say component #3) but the other (#2) will not be, prior to a
sudden, hard termination of the node?

3) Is the alternate possible, where #2 is backed up to its Kafka Topic but
#3 is not? Does the ordering of the topology matter in this case?

4) Is it possible that the state store #2 is updated and logged, but the
source topic (#1) offset is not updated?

In all of these cases, my main concern is keeping the state and the
expected output consistent. For any failure mode, will I be able to recover
to a fully consistent state given the requirements of the state machine in
#2?

Though this is a trivial example, I am not certain about the dynamics
between maintaining state, recovering from internal changelog topics, and
the order in which all of these things apply. Any words of wisdom or
explanations would be helpful here. I have been looking through the code
but I wanted to get second opinions on this.



Thanks,

Adam


Re: Kafka topic retention question

2018-07-21 Thread Gwen Shapira
Ah, common source of confusion!

Each partition is divided into 1GB segments. And the active segment, the
one you are currently writing into, is never deleted.

So, until you write 1GB, you will see all messages. If you need more
accurate retention, you can configure smaller segment size for low
throughput topics.

Gwen

On Sat, Jul 21, 2018, 9:28 AM David Collette  wrote:

> I have been diving into to Kafka for the last couple of weeks. I am running
> some low volume (5-10 TPS) production data. I had set a default retention
> period  on all the topics for 48 hours. I have just noticed there is some
> behavior I don't understand. I see messages in the topic that are over a
> week old and they don't seem to be removed. Is there some documentation on
> the topic clean up process or guidance on how can I figure out why the
> messages are not being removed?
>
> This is what I am seeing on the topic:
>
> * messages from July 13 - July 15th
> * No messages from July 16th - July 18 (they were there at one point, but
> removed because retention I assume)
> * messages from July 20th and July 21 (this is what I expected with the 48
> hour retention
>
> Any guidance would be much appreciated.
>
>
> Thanks,
> David Collette
> collett...@gmail.com
>


Kafka topic retention question

2018-07-21 Thread David Collette
I have been diving into to Kafka for the last couple of weeks. I am running
some low volume (5-10 TPS) production data. I had set a default retention
period  on all the topics for 48 hours. I have just noticed there is some
behavior I don't understand. I see messages in the topic that are over a
week old and they don't seem to be removed. Is there some documentation on
the topic clean up process or guidance on how can I figure out why the
messages are not being removed?

This is what I am seeing on the topic:

* messages from July 13 - July 15th
* No messages from July 16th - July 18 (they were there at one point, but
removed because retention I assume)
* messages from July 20th and July 21 (this is what I expected with the 48
hour retention

Any guidance would be much appreciated.


Thanks,
David Collette
collett...@gmail.com


[jira] [Resolved] (KAFKA-4870) A question about broker down , the server is doing partition master election,the client producer may send msg fail . How the producer deal with the situation ??

2018-06-18 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4870.
--
Resolution: Information Provided

If the produce request fails, the producer automatically retry based on retries 
config for any retry exceptions. Also Producer updates the metadata for any 
exceptions or if any partitions does not have leader etc..

Post these kind of queries to 
[us...@kafka.apache.org|mailto:us...@kafka.apache.org] mailing list 
([http://kafka.apache.org/contact]) for  quicker responses.

> A question about broker down , the server is doing partition master 
> election,the client producer may send msg fail . How the producer deal with 
> the situation ??
> 
>
> Key: KAFKA-4870
> URL: https://issues.apache.org/jira/browse/KAFKA-4870
> Project: Kafka
>  Issue Type: Test
>  Components: clients
> Environment: java client 
>Reporter: zhaoziyan
>Priority: Minor
>
> the broker down . The kafka cluster is doing partion  master election , the 
> producer send order msg or nomal msg ,the producer may send msg fail .How 
> client update metadata and deal with the msg send fail ?? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >