[jira] [Created] (KAFKA-14366) Kafka consumer rebalance issue, offsets points back to very old committed offset

2022-11-07 Thread Chetan (Jira)
Chetan created KAFKA-14366:
--

 Summary: Kafka consumer rebalance issue, offsets points back to 
very old committed offset
 Key: KAFKA-14366
 URL: https://issues.apache.org/jira/browse/KAFKA-14366
 Project: Kafka
  Issue Type: Bug
  Components: consumer, offset manager
Affects Versions: 2.8.1
 Environment: Production
Reporter: Chetan
 Attachments: rebalance issue.docx

Hi All,

We are facing an issue while the client consumer restart (again not all 
restarts are ending up with this issue) and during the re-balancing scenario, 
sometimes one of the partition offsets goes back a long way from the committed 
offset.

Scenario :

Assume we have 4 instances of consumer and restarts of consumer one after the 
other.
 # At the time of starting restarts assume the offset on partition 10 of a 
topic being consumed is pointing to 5. (last offset of the topic)
 # When restarts start suddenly the offsets start pointing to 2.
 # While all the restarts are going on the consumer who is attached starts 
reading from 2 and goes on.
 # Once all rebalance is completed, and all messages from 2 to 5 offset 
has been read (where it had stopped initially)
We end up having around 30K duplicates.

(The numbers here are just an example, in production, we are facing huge 
duplicates and every two rebalance during restarts of consumer out of 10 
restart exercise activity ends up in such duplicates and not all partitions and 
only one or two partitions behave this way and randomly)

This seems to be a bug. I am attaching all screenshots for reference as well.

Can someone kindly help out here?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1344

2022-11-07 Thread Apache Jenkins Server
See 




Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1343

2022-11-07 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-878: Autoscaling for Statically Partitioned Streams

2022-11-07 Thread Matthias J. Sax
Thanks for the KIP Sophie. Seems there is a lively discussion going on. 
I tried to read up on the history and I hope I don't repeat what was 
already discussed.


And sorry for the quite long email...


(1) Stateless vs Stateful

I agree that stateless apps should be supported, even if I am not sure 
how many stateless app will benefit from it. If an app is stateless, why 
would one need to repartition to begin with? Stateless apps might most 
likely be apps with a single sub-topology and thus don't need this 
feature to handle input topic scale out. Of course, there could be some 
apps with more than one sub-topology and I don't see any reason why we 
should not support scaling out those?


However, the point being is, that this feature is mainly useful for 
stateful apps from my understanding.



(2) Config

I am not sure if using `static.partitioner.class` is a good choice and I 
would personally opt for a boolean config. The reason is (as already 
mentioned by Bruno) that (stateful) apps might have a single 
sub-topology: for this case, the static partitioning must be enforce 
upstream already, and Kafka Streams must "just" add a new partition to 
the state changelog topics to scale out. It seems odd to force users to 
pass in a partitioner that might not be use by the runtime (the only 
exception might be IQ which might not be used).


I also don't understand why we would need to enforce that downstream 
output topics are using the same static partitioning that the input or 
any repartition topics? We don't know anything about the potential 
chaining of apps, and it's also not clear to me, why the output topic 
would need to be scaled as claimed (it's a possibility, but I am sure 
there are many cases for which the output topic is not touched and 
standard hash/range/random partitioning is used and just fine)? In the 
end, it's the users responsibility and we should not enforce artificial 
limitations (cf (4) below).


I agree that we might want to add a new `default.partitioner` config 
though to make it simpler for users to change the partitioner globally 
instead of one-by-one method overwrites, for the case users need it.



(3) StaticPartitioner

Do we really need this new interface? The only benefit I see is the 
added callback `onPartitionExpansion(...)` (but we can add this to 
existing `StreamPartitioner` interface, too). In particular, I don't see 
any benefit in adding `staticPartition(...)` method -- if we say it's 
the users responsibility to implement a static partitioning strategy, 
they can just implement the existing `partition(...)` method IMHO. I 
don't see what we gain by the new interface?



(3a) About `onPartitionExpansion()`: why do we need to pass in old/new 
partition count?



(3b) Why should users throw a `TaskMigratedException` if they want to 
put a record into a non-existing partition? The name seems inappropriate 
to me.
 -> I am also not sure, how this could happen, except for a user error, 
ie, when the user writes new keys into the input topic before the 
expansion operation is finished; and for this case it seems ok to just 
crash (maybe the user did not even enable the feature or did not intent 
to scale the app at all and wrote an "bad key" into the input topic; for 
the later case, we might end up in an infinite rebalance as the input 
topic was not scaled to begin with). -- Again, it seems we cannot (and 
should not try to) guard the user for this case?




(4) User Responsibility

Using the feature is for advanced users only and they have a lot of 
responsibility to use it correctly. For stateful single sub-topology 
cases, their responsibility starts upstream by ensuring that the input 
topic is partitioned statically.


Thus, I don't understand why we want to disallow any overwrite of the 
partitioner in the code (and enforce a single partitioner 
implemenation)? Similar to anything else, it's the user's responsibility 
to do the correct thing, and it feels like artificial safe-guards to me 
to disallow it. I would prefer full flexibility, because if there are 
100 ways user can misuse this feature, it does not buy is much to limit 
it to 99 ways by those restrictions and it will make the implementation 
(for the feature) much simpler if we don't have restrictions but put the 
burden onto the user.



(5) Runtime

There is a larger section about runtime handling and I am not sure if I 
fully understand everything.


For example:


However, it should be noted that you should not change the partitioner for 
existing applications and so this feature will generally be limited to new 
applications only.


What do you mean by this and why would we limit the feature to new apps? 
Given the stateful single sub-topology example from above, I don't see 
any reason why such an app should not benefit from it (given that the 
input topic is already statically partitioned)?



Furthermore, what do you mean by:


No repartitioning of internal topics will be performed 

Re: [DISCUSS] KIP-852 Optimize calculation of size for log in remote tier

2022-11-07 Thread Jun Rao
Hi, Divij,

Thanks for the KIP. Sorry for the late reply.

The motivation of the KIP is to improve the efficiency of size based
retention. I am not sure the proposed changes are enough. For example, if
the size exceeds the retention size, we need to determine the subset of
segments to delete to bring the size within the retention limit. Do we need
to call RemoteLogMetadataManager.listRemoteLogSegments() to determine that?
Also, what about time-based retention? To make that efficient, do we need
to make some additional interface changes?

An alternative approach is for the RLMM implementor to make sure
that RemoteLogMetadataManager.listRemoteLogSegments() is fast (e.g., with
local caching). This way, we could keep the interface simple. Have we
considered that?

Thanks,

Jun

On Wed, Sep 28, 2022 at 6:28 AM Divij Vaidya 
wrote:

> Hey folks
>
> Does anyone else have any thoughts on this before I propose this for a
> vote?
>
> --
> Divij Vaidya
>
>
>
> On Mon, Sep 5, 2022 at 12:57 PM Satish Duggana 
> wrote:
>
> > Thanks for the KIP Divij!
> >
> > This is a nice improvement to avoid recalculation of size. Customized
> RLMMs
> > can implement the best possible approach by caching or maintaining the
> size
> > in an efficient way. But this is not a big concern for the default topic
> > based RLMM as mentioned in the KIP.
> >
> > ~Satish.
> >
> > On Wed, 13 Jul 2022 at 18:48, Divij Vaidya 
> > wrote:
> >
> > > Thank you for your review Luke.
> > >
> > > > Reg: is that would the new `RemoteLogSizeBytes` metric be a
> performance
> > > overhead? Although we move the calculation to a seperate API, we still
> > > can't assume users will implement a light-weight method, right?
> > >
> > > This metric would be logged using the information that is already being
> > > calculated for handling remote retention logic, hence, no additional
> work
> > > is required to calculate this metric. More specifically, whenever
> > > RemoteLogManager calls getRemoteLogSize API, this metric would be
> > captured.
> > > This API call is made every time RemoteLogManager wants to handle
> expired
> > > remote log segments (which should be periodic). Does that address your
> > > concern?
> > >
> > > Divij Vaidya
> > >
> > >
> > >
> > > On Tue, Jul 12, 2022 at 11:01 AM Luke Chen  wrote:
> > >
> > > > Hi Divij,
> > > >
> > > > Thanks for the KIP!
> > > >
> > > > I think it makes sense to delegate the responsibility of calculation
> to
> > > the
> > > > specific RemoteLogMetadataManager implementation.
> > > > But one thing I'm not quite sure, is that would the new
> > > > `RemoteLogSizeBytes` metric be a performance overhead?
> > > > Although we move the calculation to a seperate API, we still can't
> > assume
> > > > users will implement a light-weight method, right?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Fri, Jul 1, 2022 at 5:47 PM Divij Vaidya  >
> > > > wrote:
> > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier
> > > > >
> > > > >
> > > > > Hey folks
> > > > >
> > > > > Please take a look at this KIP which proposes an extension to
> > KIP-405.
> > > > This
> > > > > is my first KIP with Apache Kafka community so any feedback would
> be
> > > > highly
> > > > > appreciated.
> > > > >
> > > > > Cheers!
> > > > >
> > > > > --
> > > > > Divij Vaidya
> > > > > Sr. Software Engineer
> > > > > Amazon
> > > > >
> > > >
> > >
> >
>


Re: Regarding Producer Metric - "request-latency-avg"

2022-11-07 Thread Amrit Gupta
So request-latency-avg is the time after drain?

Can you try to share a elaborated explanation of this metric calculation
that is being done.

On Tue, 8 Nov 2022 at 1:30 AM, Philip Nee  wrote:

> Hey Amrit,
>
> Actually, I think the confluent guide is correct because it seems like the
> request is created here
> 
> .
> and the queue time
> 
> is actually the time difference between the batch creation and drain - each
> runOnce() in the sender loop first drains, then invoke sendProduceRequest.
> Sorry about the previous misleading reply.
>
> P
>
> On Mon, Nov 7, 2022 at 11:39 AM Amrit Gupta 
> wrote:
>
>> Bumping
>>
>> On Mon, 7 Nov 2022 at 12:45 PM, Amrit Gupta 
>> wrote:
>>
>>> Hi Philip,
>>>
>>> >>  the latency measures the time difference between the *time the
>>> request was created* and the response received
>>> >>  I believe it is *between the time send() was called*, and the time
>>> the producer receives the response from the broker.
>>>
>>> I believe this is not right as if we see
>>> 
>>> :
>>> [image: image.png]
>>>
>>> >> the producer doesn't immediately transmit the sends when send() is
>>> called, the requests can be batched depending on your configuration.
>>>
>>> if it was like the way you mentioned,  then it implies it includes the
>>> producer buffer time also as you mentioned then *request-latency-avg*
>>> should be *always greater* than *request-queue-time-avg. *But as per
>>> the results above we can see in multiple cases, it is actually vice-versa.
>>>
>>>
>>> Please add more thoughts to this and let me know what you think.
>>>
>>> Thanks,
>>> T
>>>
>>


Re: Regarding Producer Metric - "request-latency-avg"

2022-11-07 Thread Philip Nee
Hey Amrit,

Actually, I think the confluent guide is correct because it seems like the
request is created here

.
and the queue time

is actually the time difference between the batch creation and drain - each
runOnce() in the sender loop first drains, then invoke sendProduceRequest.
Sorry about the previous misleading reply.

P

On Mon, Nov 7, 2022 at 11:39 AM Amrit Gupta  wrote:

> Bumping
>
> On Mon, 7 Nov 2022 at 12:45 PM, Amrit Gupta 
> wrote:
>
>> Hi Philip,
>>
>> >>  the latency measures the time difference between the *time the
>> request was created* and the response received
>> >>  I believe it is *between the time send() was called*, and the time
>> the producer receives the response from the broker.
>>
>> I believe this is not right as if we see
>> 
>> :
>> [image: image.png]
>>
>> >> the producer doesn't immediately transmit the sends when send() is
>> called, the requests can be batched depending on your configuration.
>>
>> if it was like the way you mentioned,  then it implies it includes the
>> producer buffer time also as you mentioned then *request-latency-avg*
>> should be *always greater* than *request-queue-time-avg. *But as per the
>> results above we can see in multiple cases, it is actually vice-versa.
>>
>>
>> Please add more thoughts to this and let me know what you think.
>>
>> Thanks,
>> T
>>
>


Re: Regarding Producer Metric - "request-latency-avg"

2022-11-07 Thread Amrit Gupta
Bumping

On Mon, 7 Nov 2022 at 12:45 PM, Amrit Gupta  wrote:

> Hi Philip,
>
> >>  the latency measures the time difference between the *time the
> request was created* and the response received
> >>  I believe it is *between the time send() was called*, and the time
> the producer receives the response from the broker.
>
> I believe this is not right as if we see
> 
> :
> [image: image.png]
>
> >> the producer doesn't immediately transmit the sends when send() is
> called, the requests can be batched depending on your configuration.
>
> if it was like the way you mentioned,  then it implies it includes the
> producer buffer time also as you mentioned then *request-latency-avg*
> should be *always greater* than *request-queue-time-avg. *But as per the
> results above we can see in multiple cases, it is actually vice-versa.
>
>
> Please add more thoughts to this and let me know what you think.
>
> Thanks,
> T
>


[jira] [Created] (KAFKA-14365) Refactor Fetcher to allow different implementations

2022-11-07 Thread Kirk True (Jira)
Kirk True created KAFKA-14365:
-

 Summary: Refactor Fetcher to allow different implementations
 Key: KAFKA-14365
 URL: https://issues.apache.org/jira/browse/KAFKA-14365
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


The `Fetcher` API is used internally by the `KafkaConsumer` to fetch records 
from the brokers. There is ongoing work to create a new consumer implementation 
with a significantly refactored threading model. The threading refactor work 
requires a similarly refactored `Fetcher`. In order to keep the existing 
`KafkaConsumer` as untouched as possible, this Jira proposes to refactor the 
`Fetcher` so as to allow other implementations to use the unit tests and 
`KafkaConsumer`.

Here are the proposed steps:
 # Extract out the common APIs used by the `KafkaConsumer` and related unit 
tests into a new Java interface named `Fetcher`
 # Rename the existing `Fetcher` as `KafkaFetcher` (or similar)
 # Refactor the `KafkaConsumer`, `FetcherTest`, and other call sites to 
primarily use the new `Fetcher` interface

A future pull request will add the new `Fetcher` implementation and tie it in 
to the existing `FetcherTest` tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14364) Support evolving serde with Foreign Key Join

2022-11-07 Thread John Roesler (Jira)
John Roesler created KAFKA-14364:


 Summary: Support evolving serde with Foreign Key Join
 Key: KAFKA-14364
 URL: https://issues.apache.org/jira/browse/KAFKA-14364
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


The current implementation of Foreign-Key join uses a hash comparison to 
determine whether it should emit join results or not. See 
[https://github.com/apache/kafka/blob/807c5b4d282e7a7a16d0bb94aa2cda9566a7cc2d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java#L94-L110]

As specified in KIP-213 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]
 ), we must do a comparison of this nature in order to get correct results when 
the foreign-key reference changes, as the old reference might emit delayed 
results after the new instance generates its updated results, leading to an 
incorrect final join state.

The hash comparison prevents this race condition by ensuring that any emitted 
results correspond to the _current_ version of the left-hand-side record (and 
therefore that the foreign-key reference itself has not changed).

An undesired side-effect of this is that if users update their serdes (in a 
compatible way), for example to add a new optional field to the record, then 
the resulting hash will change for existing records. This will cause Streams to 
stop emitting results for those records until a new left-hand-side update comes 
in, recording a new hash for those records.

It should be possible to provide a fix. Some ideas:
 * only consider the foreign-key references itself in the hash function (this 
was the original proposal, but we opted to hash the entire record as an 
optimization to suppress unnecessary updates).
 * provide a user-overridable hash function. This would be more flexible, but 
also pushes a lot of complexity onto users, and opens up the possibility to 
completely break semantics.

We will need to design the solution carefully so that we can preserve the 
desired correctness guarantee.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-877: Mechanism for plugins and connectors to register metrics

2022-11-07 Thread Mickael Maison
Hi,

I have opened KIP-877 to make it easy for plugins and connectors to
register their own metrics:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-877%3A+Mechanism+for+plugins+and+connectors+to+register+metrics

Let me know if you have any feedback or suggestions.

Thanks,
Mickael


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1341

2022-11-07 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 510890 lines...]
[2022-11-07T15:38:42.739Z] 
[2022-11-07T15:38:42.739Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuter[caching
 enabled = false] STARTED
[2022-11-07T15:38:49.819Z] 
[2022-11-07T15:38:49.819Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuter[caching
 enabled = false] PASSED
[2022-11-07T15:38:49.819Z] 
[2022-11-07T15:38:49.819Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeft[caching
 enabled = false] STARTED
[2022-11-07T15:38:55.641Z] 
[2022-11-07T15:38:55.641Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeft[caching
 enabled = false] PASSED
[2022-11-07T15:38:55.641Z] 
[2022-11-07T15:38:55.641Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner[caching
 enabled = false] STARTED
[2022-11-07T15:39:01.666Z] 
[2022-11-07T15:39:01.666Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner[caching
 enabled = false] PASSED
[2022-11-07T15:39:01.666Z] 
[2022-11-07T15:39:01.666Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerOuter[caching
 enabled = false] STARTED
[2022-11-07T15:39:07.337Z] 
[2022-11-07T15:39:07.337Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerOuter[caching
 enabled = false] PASSED
[2022-11-07T15:39:07.337Z] 
[2022-11-07T15:39:07.337Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerLeft[caching
 enabled = false] STARTED
[2022-11-07T15:39:13.857Z] 
[2022-11-07T15:39:13.857Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerLeft[caching
 enabled = false] PASSED
[2022-11-07T15:39:13.857Z] 
[2022-11-07T15:39:13.857Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterInner[caching
 enabled = false] STARTED
[2022-11-07T15:39:19.681Z] 
[2022-11-07T15:39:19.681Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterInner[caching
 enabled = false] PASSED
[2022-11-07T15:39:19.681Z] 
[2022-11-07T15:39:19.681Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterOuter[caching
 enabled = false] STARTED
[2022-11-07T15:39:30.080Z] 
[2022-11-07T15:39:30.080Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TableTableJoinIntegrationTest > [caching enabled = false] > 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testOuterOuter[caching
 enabled = false] PASSED
[2022-11-07T15:39:30.080Z] 
[2022-11-07T15:39:30.080Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TaskAssignorIntegrationTest > 
shouldProperlyConfigureTheAssignor STARTED
[2022-11-07T15:39:30.080Z] 
[2022-11-07T15:39:30.080Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TaskAssignorIntegrationTest > 
shouldProperlyConfigureTheAssignor PASSED
[2022-11-07T15:39:30.080Z] 
[2022-11-07T15:39:30.080Z] Gradle Test Run :streams:integrationTest > Gradle 
Test Executor 167 > TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation STARTED

[jira] [Resolved] (KAFKA-14346) Remove static methods from internal Connect APIs for easier testing

2022-11-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14346.
---
Fix Version/s: 3.4.0
   Resolution: Fixed

> Remove static methods from internal Connect APIs for easier testing
> ---
>
> Key: KAFKA-14346
> URL: https://issues.apache.org/jira/browse/KAFKA-14346
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.4.0
>
>
> Our use of static methods for internal APIs such as the [RestClient 
> class|https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java]
>  and 
> [Plugins::compareAndSwapLoaders|https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java#L123-L129]
>  makes testing difficult, especially with the in-progress migration from 
> EasyMock/PowerMock to Mockito.
> We should remove these static methods and replace them with non-static 
> methods that can be more easily mocked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14363) Add new `coordinator` module

2022-11-07 Thread David Jacot (Jira)
David Jacot created KAFKA-14363:
---

 Summary: Add new `coordinator` module
 Key: KAFKA-14363
 URL: https://issues.apache.org/jira/browse/KAFKA-14363
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-07 Thread Mikael (Jira)
Mikael created KAFKA-14362:
--

 Summary: Same message consumed by two consumers in the same group  
after client restart
 Key: KAFKA-14362
 URL: https://issues.apache.org/jira/browse/KAFKA-14362
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.1.1
 Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
Reporter: Mikael


Trigger scenario:

Two Kafka client application instances on separate EC2 instances with one 
consumer each, consuming from the same 8 partition topic using the same group 
ID. Duplicate consumption of a handful of messages sometimes happens right 
after one of the application instances has been restarted.

Additional information:

Messages are produced to the topic by a Kafka streams topology deployed on four 
application instances. I have verified that each message is only produced once 
by enabling debug logging in the topology flow right before producing each 
message to the topic.

Example logs below are from a test run when a batch of 11 messages were 
consumed at 10:28:26,771 on the restarted instance and 9 of them were consumed 
as part of a larger batch at 10:28:23,824 on the other instance. Application 
shutdown was initiated at  10:27:13,086 and completed at 10:27:15,164, startup 
was initiated at 10:28:05,029 and completed at 10:28:37,491.

Kafka consumer group logs after restart on the instance that was restarted:

 
{code:java}
2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
[Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[AbstractCoordinator.java:853] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Discovered group coordinator 
b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
2147483646 rack: null)
2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[AbstractCoordinator.java:535] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
(Re-)joining group
2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[AbstractCoordinator.java:1000] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Request joining group due to: need to re-join with the given member-id
2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[AbstractCoordinator.java:535] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
(Re-)joining group
2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[AbstractCoordinator.java:595] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Successfully joined group with generation Generation{generationId=676, 
memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
 protocol='cooperative-sticky'}
2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[AbstractCoordinator.java:761] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Successfully synced group in generation Generation{generationId=676, 
memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
 protocol='cooperative-sticky'}
2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:395] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Updating assignment with
        Assigned partitions:                       []
        Current owned partitions:                  []
        Added partitions (assigned - owned):       []
        Revoked partitions (owned - assigned):     []
2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:279] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Notifying assignor about the new Assignment(partitions=[])
2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:291] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Adding newly assigned partitions: 
2022-11-07 10:28:25,315 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[AbstractCoordinator.java:1000] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Request joining group due to: group is already rebalancing
2022-11-07 10:28:25,317 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[AbstractCoordinator.java:535] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
(Re-)joining group
2022-11-07 10:28:25,319 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 

Re: [DISCUSS] KIP-872: Add Serializer#serializeToByteBuffer() to reduce memory copying

2022-11-07 Thread Divij Vaidya
Apologies for the late reply. I will be more proactive on this thead moving
ahead.

3. Understood. Perhaps, `ByteBuffer#asReadOnlyBuffer()` is not the right
solution and I acknowledge that the current API modifies the offsets of the
user provided input when it calls flip(). I still believe that we should
not be modifying (both offsets and data) the user provided input
bytebuffer, but I understand that it would require a change in semantics
wrt existing API behaviour. I would vote for having new/correct semantics
introduced with this KIP itself (else, as John mentioned, we would have to
add a new method later). I am advocating for new semantics because it
clarifies the contract rigorously (immutable state of input params) which
enables consumers of the API to do some nifty things on their end wrt
memory management.

4. Regarding #4 I mentioned earlier, do you agree with the comment? If yes,
can you please add the JavaDocs to the API contract defined in KIP?

5. From a backward compatibility perspective, would the offsets for the
user provided ByteBuffer (key & value) remain the same as the earlier
implementation for `doSend()`? Could we add a test to verify this? Perhaps,
this is worth clarifying in the KIP?

--
Divij Vaidya



On Sun, Nov 6, 2022 at 4:23 PM John Roesler  wrote:

> Thanks for the reply, ShunKang!
>
> You’re absolutely right, we should not change the behavior of the existing
> method.
>
> Regarding the new method, I was thinking that this is a good opportunity
> to correct what seems to be strange semantics in the original one. If we
> keep the same semantics and want to correct it later, we’ll be forced to
> introduce yet another method later. This especially makes sense if we’re
> thinking of deprecating the original method. But if you think it’s better
> to keep it the way it is, I’m fine with it.
>
> I have no other comments.
>
> Thanks again for the KIP,
> John
>
> On Sat, Nov 5, 2022, at 11:59, ShunKang Lin wrote:
> > Hi John,
> >
> > Thanks for your comments!
> >
> > For your first question, I see some unit test cases that give us a
> > ByteBuffer not set to read before calling
> > `ByteBufferSerializer#serialize(String, ByteBuffer)`, e.g.
> > `ArticleSerializer`, `AugmentedArticleSerializer`,
> > `AugmentedCommentSerializer` and `CommentSerializer`. If we don't flip
> the
> > ByteBuffer inside the `ByteBufferSerializer#serialize(String,
> ByteBuffer)`
> > it will break user code using `ByteBufferSerializer#serialize(String,
> > ByteBuffer)`, and if we don't flip the ByteBuffer inside
> > the `ByteBufferSerializer#serializeToByteBuffer(String, ByteBuffer)`, it
> > will be even more strange to the user, because
> > `ByteBufferSerializer#serialize(String, ByteBuffer)` and
> > `ByteBufferSerializer#serializeToByteBuffer(String, ByteBuffer)` require
> > users use the ByteBufferSerializer in two different ways. So if we think
> of
> > `ByteBufferSerialize#serializeToByteBuffer(String, ByteBuffer)` as
> setting
> > up a ByteBuffer to read later, is it more acceptable?
> >
> > For your second question, I plan to ultimately replace byte[] with
> > ByteBuffer, I will document the intent in your KIP and JavaDocs later.
> >
> > I will clarify that if a Serializer implements the new method, then the
> old
> > one will never be called.
> >
> > Best,
> > ShunKang
> >
> > John Roesler  于2022年11月4日周五 22:42写道:
> >
> >> Hi ShunKang,
> >>
> >> Thanks for the KIP!
> >>
> >> I’ve been wanting to transition toward byte buffers for a while, so this
> >> is a nice start.
> >>
> >> I thought it was a bit weird to flip the buffer inside the serializer,
> but
> >> I see the existing one already does that. I would have thought it would
> >> make more sense for the caller to give us a buffer already set up for
> >> reading. Do you think it makes sense to adopt this pattern for the new
> >> method?
> >>
> >> Do you plan to keep the new methods as optional indefinitely, or do you
> >> plan to ultimately replace byte[] with ByteBuffer? If it’s the latter,
> then
> >> it would be good to document the intent in your KIP and JavaDocs.
> >>
> >> It would be good to clarify that if a Serializer implements the new
> >> method, then the old one will never be called. That way, implementations
> >> can just throw an exception on that method instead of implementing both.
> >>
> >> Thanks again!
> >> -John
> >>
> >> On Wed, Nov 2, 2022, at 20:14, ShunKang Lin wrote:
> >> > Bump this thread again : )
> >> >
> >> > ShunKang Lin 于2022年9月25日 周日23:59写道:
> >> >
> >> >> Hi all, I'd like to start a new discussion thread on KIP-872 (Kafka
> >> >> Client) which proposes that add Serializer#serializeToByteBuffer() to
> >> >> reduce memory copying.
> >> >>
> >> >> KIP:
> >> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828
> >> >> Thanks, ShunKang
> >> >>
> >>
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1340

2022-11-07 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-14344) Improve MM2 integration test by building EmbeddedKafkaCluster with common configs used for all clients

2022-11-07 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14344.

Fix Version/s: 3.4.0
   Resolution: Fixed

> Improve MM2 integration test by building EmbeddedKafkaCluster with common 
> configs used for all clients
> --
>
> Key: KAFKA-14344
> URL: https://issues.apache.org/jira/browse/KAFKA-14344
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, mirrormaker, unit tests
>Reporter: Omnia Ibrahim
>Assignee: Omnia Ibrahim
>Priority: Minor
> Fix For: 3.4.0
>
>
> Connect and MirrorMaker's integration test use `EmbeddedKafkaCluster` and 
> `EmbeddedConnectCluster` to setup connect cluster during testing. Both 
> classes are easy to setup if the test needs vanilla clusters, however, it's a 
> lot of work to make it set it up with more advanced config (for example 
> authentication and authorization) where admin, consumer and producer clients 
> need more configuration. 
> 1. I am proposing adding extra parameter `additionalClientConfigs` to 
> `EmbeddedKafkaCluster` constructor. The new parameter will be used
>  - Setup Producer Client in `EmbeddedKafkaCluster.doStart` which is 
> initializing `producer` client that is used in `EmbeddedKafkaCluster.produce`
>  - Setup Producer Client in `EmbeddedKafkaCluster.createProducer` used in 
> `EmbeddedKafkaCluster.transactionalProducer`
>  - Setup Admin Client in `EmbeddedKafkaCluster.createAdminClient` used in 
> `EmbeddedKafkaCluster.createTopic`, `EmbeddedKafkaCluster.consumeAll`, 
> `EmbeddedKafkaCluster.describeTopics` and `EmbeddedKafkaCluster.deleteTopic`
>  - Setup Consumer Client in `EmbeddedKafkaCluster.createConsumer` used in 
> `EmbeddedKafkaCluster.createConsumerAndSubscribeTo` and 
> `EmbeddedKafkaCluster.consumeAll`
> 2. And add 
> `EmbeddedConnectCluster.Builder.additionalKafkaClusterClientConfigs`.
>  
> Tests impacted by this 
> - MirrorMaker integration tests
> - `org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS] KIP-710: Full support for distributed mode in dedicated MirrorMaker 2.0 clusters

2022-11-07 Thread Mickael Maison
Hi Daniel,

Thanks for the KIP.

It would be nice to expose more of the REST API as some endpoints are
really useful, for example /admin or
/connectors//tasks-config. However as dedicated mode is
currently unusable, I think the approach of "just fixing it" by
exposing the internal endpoints is fine. It also does not seem to
corner us too much if we decide to make further changes in the future.

One suggestion I have is to avoid using "mm" in the configuration
name. Could we rename mm.enable.internal.rest to
dedicated.mode.enable.internal.rest or something like that?

Thanks,
Mickael

On Tue, Sep 27, 2022 at 3:56 PM Chris Egerton  wrote:
>
> Thanks Daniel! No further comments from me, looks good.
>
> On Tue, Sep 27, 2022 at 4:39 AM Dániel Urbán  wrote:
>
> > Hi Chris,
> >
> > I understand your points, and I agree that this path is safer in terms of
> > future plans, I accept it.
> > Updated the KIP accordingly.
> >
> > Thanks,
> > Daniel
> >
> > Chris Egerton  ezt írta (időpont: 2022. szept.
> > 21., Sze, 18:54):
> >
> > > Hi Daniel,
> > >
> > > I'm a little hesitant to add support for REST extensions and the admin
> > API
> > > to dedicated MM2 nodes because they may restrict our options in the
> > future
> > > if/when we add a public-facing REST API.
> > >
> > > Regarding REST extensions specifically, I understand their security value
> > > for public-facing APIs, but for the internal API--which should only ever
> > be
> > > targeted by MM2 nodes, and never by humans, UIs, CLIs, etc.--I'm not sure
> > > there's enough need there to add that API this time around. The internal
> > > endpoints used by Kafka Connect should be secured by the session key
> > > mechanism introduced in KIP-507 [1], and IIUC, with this KIP, users will
> > > also be able to configure their cluster to use mTLS.
> > >
> > > Regarding the admin API, I consider it part of the public-facing REST API
> > > for Kafka Connect, so I was assuming we wouldn't want to add it to this
> > KIP
> > > since we're intentionally slimming down the REST API to just the bare
> > > essentials (i.e., just the internal endpoints). I can see value for it,
> > but
> > > it's similar to the status endpoints in the Kafka Connect REST API; we
> > > might choose to expose this sometime down the line, but IMO it'd be
> > better
> > > to do that in a separate KIP so that we can iron out the details of
> > exactly
> > > what kind of REST API would best suit dedicated MM2 clusters, and how
> > that
> > > would differ from the API provided by vanilla Kafka Connect.
> > >
> > > Let me know what you think!
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > [1] -
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-507%3A+Securing+Internal+Connect+REST+Endpoints
> > >
> > > On Wed, Sep 21, 2022 at 4:59 AM Dániel Urbán 
> > > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > About the worker id: makes sense. I changed the wording, but kept it
> > > listed
> > > > as it is a change compared to existing MM2 code. Updated the KIP
> > > > accordingly.
> > > >
> > > > About the REST server configurations: again, I agree, it should be
> > > > generalized. But I'm not sure about those exceptions you listed, as all
> > > of
> > > > them make sense in MM2 as well. For example, security related rest
> > > > extensions could work with MM2 as well. Admin listeners are also
> > useful,
> > > as
> > > > they would allow the same admin operations for MM2 nodes. Am I mistaken
> > > > here? Updated the KIP without mentioning exceptions for now.
> > > >
> > > > I think yes, the lazy config resolution should be unconditional. Even
> > if
> > > we
> > > > don't consider the distributed mode of MM2, the eager resolution does
> > not
> > > > allow using sensitive configs in the mm2 properties, as they will be
> > > > written by value into the config topic. I didn't really consider this
> > as
> > > > breaking change, but I might be wrong.
> > > >
> > > > Enable flag property name: also makes sense, updated the KIP.
> > > >
> > > > Thanks a lot!
> > > > Daniel
> > > >
> > > > Chris Egerton  ezt írta (időpont: 2022.
> > szept.
> > > > 20., K, 20:29):
> > > >
> > > > > Hi Daniel,
> > > > >
> > > > > Looking pretty good! Regarding the worker ID generation--apologies, I
> > > was
> > > > > unclear with my question. I was wondering if we had to adopt any
> > > special
> > > > > logic at all for MM2, or if we could use the same logic that vanilla
> > > > Kafka
> > > > > Connect does in distributed mode, where the worker ID is its
> > advertised
> > > > URL
> > > > > (e.g., "connect:8083" or "localhost:25565"). Unless I'm mistaken,
> > this
> > > > > should allow MM2 nodes to be identified unambiguously. Do you think
> > it
> > > > > makes sense to follow this strategy?
> > > > >
> > > > > Now that the details on the new REST interface have been fleshed out,
> > > I'm
> > > > > also wondering if we want to add support for the "
> > > > > rest.advertised.host.name",
> > > > > 

[jira] [Resolved] (KAFKA-13887) Running multiple instance of same stateful KafkaStreams application on single host raise Exception

2022-11-07 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13887.
---
Resolution: Not A Problem

> Running multiple instance of same stateful KafkaStreams application on single 
> host raise Exception
> --
>
> Key: KAFKA-13887
> URL: https://issues.apache.org/jira/browse/KAFKA-13887
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sina Askarnejad
>Priority: Minor
>
> KAFKA-10716 locks the state store directory on the running host, as it stores 
> the processId in a *kafka-streams-process-metadata* file in this path. As a 
> result to run multiple instances of the same application on a single host 
> each instance must run with different *state.dir* config, otherwise the 
> following exception will be raised for the second instance:
>  
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> Unable to initialize state, this can happen if multiple instances of Kafka 
> Streams are running in the same state directory
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:191)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:868)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:851)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:821)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:733)
>  
> The easiest solution multi-threading. Running single instance with multiple 
> threads, but the multi-threading programming is not suitable for all 
> scenarios. e.g., when the tasks are CPU intensive, or in large scale 
> scenarios, or fully utilizing multi core CPUS.
>  
> The second solution is multi-processing. This solution on a single host needs 
> extra work and advisor, as each instance needs to be run with different 
> {*}state.dir{*}. It is a good enhancement if kafkaStreams could handle this 
> config for multi instance.
>  
> The proposed solution is that the KafkaStreams use the 
> */\{state.dir}/\{application.id}/\{ordinal.number}* path instead of 
> */\{state.dir}/\{application.id}* to store the meta file and states. The 
> *ordinal.number* starts with 0 and is incremental.
> When an instance starts it checks the ordinal.number directories start by 0 
> and finds the first subdirectory that is not locked and use that for its 
> state directory, this way all the tasks assigns correctly on rebalance and 
> multiple instance can be run on single host.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)