[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-10-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Fix Version/s: 3.5.2

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Assignee: Lucas Brutschy
>Priority: Critical
> Fix For: 3.6.0, 3.5.2
>
> Attachments: compat_report.html.zip
>
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.

2023-10-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15690:

Component/s: streams
 unit tests

> EosIntegrationTest is flaky.
> 
>
> Key: KAFKA-15690
> URL: https://issues.apache.org/jira/browse/KAFKA-15690
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Calvin Liu
>Priority: Major
>
> EosIntegrationTest
> shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2,
>  processing threads = false]
> {code:java}
> org.junit.runners.model.TestTimedOutException: test timed out after 600 
> seconds   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:)
> at 
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821)
>   at 
> org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779)
>at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=multiPartitionInputTopic, partition=1, offset=15, 
> stacktrace=java.lang.RuntimeException: Detected we've been interrupted.   
> at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
>at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  {code}
>   shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing 
> threads = false] 
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.   at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204)
>  at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286)
>at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174)
> at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=multiPartitionInputTopic, partition=1, offset=15, 
> stacktrace=java.lang.RuntimeException: Detected we've been interrupted.   
> at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
>at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  {code}
> shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing 
> threads = false] 
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> StreamsTasks did not request commit. ==> expected:  but was: 
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) 
> at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
> java.lang.IllegalStateException: Replica 
> [Topic=__transaction_state,

Re: [DISCUSS] KIP-992 Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-10-26 Thread Matthias J. Sax

Would we really get a ClassCastException?

From my understanding, the store would reject the query as unsupported 
and thus the returned `QueryResult` object would have it's internal flag 
set to indicate the failure, but no exception would be thrown directly?


(Of course, there might be an exception thrown to the user if they don't 
check `isSuccess()` flag but call `getResult()` directly.)



-Matthias

On 10/25/23 8:55 AM, Hanyu (Peter) Zheng wrote:

Hi, Bill,
Thank you for your reply. Yes, now, if a user executes a timestamped query
against a non-timestamped store, It will throw ClassCastException.
If a user uses KeyQuery to query kv-store or ts-kv-store, it always return
V.  If a user uses TimestampedKeyQuery to query kv-store, it will throw a
exception, so TimestampedKeyQuery query can only query ts-kv-store and
return ValueAndTimestamp object in the end.

Sincerely,
Hanyu

On Wed, Oct 25, 2023 at 8:51 AM Hanyu (Peter) Zheng 
wrote:


Thank you Lucas,

I will fix the capitalization.
When a user executes a timestamped query against a non-timestamped store,
It will throw ClassCastException.

Sincerely,
Hanyu

On Tue, Oct 24, 2023 at 1:36 AM Lucas Brutschy
 wrote:


Hi Hanyu,

reading the KIP, I was wondering the same thing as Bill.

Other than that, this looks good to me. Thanks for KIP.

nit: you have method names `LowerBound` and `UpperBound`, where you
probably want to fix the capitalization.

Cheers,
Lucas

On Mon, Oct 23, 2023 at 5:46 PM Bill Bejeck  wrote:


Hey Hanyu,

Thanks for the KIP, it's a welcomed addition.
Overall, the KIP looks good to me, I just have one comment.

Can you discuss the expected behavior when a user executes a timestamped
query against a non-timestamped store?  I think it should throw an
exception vs. using some default value.
If it's the case that Kafka Stream wraps all stores in a
`TimestampAndValue` store and returning a plain `V` or a
`TimestampAndValue` object depends on the query type, then it would

be

good to add those details to the KIP.

Thanks,
Bill



On Fri, Oct 20, 2023 at 5:07 PM Hanyu (Peter) Zheng
 wrote:


Thank you Matthias,

I will modify the KIP to eliminate this restriction.

Sincerely,
Hanyu

On Fri, Oct 20, 2023 at 2:04 PM Hanyu (Peter) Zheng <

pzh...@confluent.io>

wrote:


Thank you Alieh,

In these two new query types, I will remove 'get' from all getter

method

names.

Sincerely,
Hanyu

On Fri, Oct 20, 2023 at 10:40 AM Matthias J. Sax 

wrote:



Thanks for the KIP Hanyu,

One questions:


To address this inconsistency, we propose that KeyQuery  should

be

restricted to querying kv-stores  only, ensuring that it always

returns

a

plain V  type, making the behavior of the aforementioned code more
predictable. Similarly, RangeQuery  should be dedicated to querying
kv-stores , consistently returning only the plain V .

Why do you want to restrict `KeyQuery` and `RangeQuery` to

kv-stores? I

think it would be possible to still allow both queries for

ts-kv-stores,

but change the implementation to return "plain V" instead of
`ValueAndTimestamp`, ie, the implementation would automatically
unwrap the value.



-Matthias

On 10/20/23 2:32 AM, Alieh Saeedi wrote:

Hey Hanyu,

Thanks for the KIP. It seems good to me.
Just one point: AFAIK, we are going to remove "get" from the

name of

all

getter methods.

Cheers,
Alieh

On Thu, Oct 19, 2023 at 5:44 PM Hanyu (Peter) Zheng
 wrote:


Hello everyone,

I would like to start the discussion for KIP-992: Proposal to

introduce

IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

The KIP can be found here:







https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery


Any suggestions are more than welcome.

Many thanks,
Hanyu

On Thu, Oct 19, 2023 at 8:17 AM Hanyu (Peter) Zheng <

pzh...@confluent.io>

wrote:











https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery


--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<







https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:
Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<







https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic






--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]

Re: [VOTE] KIP-988 Streams StandbyUpdateListener

2023-10-26 Thread Matthias J. Sax

+1 (binding)

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

Happy to see this -- that's a +1 (binding) from me

On Mon, Oct 23, 2023 at 6:33 AM Bill Bejeck  wrote:


This is a great addition

+1(binding)

-Bill

On Fri, Oct 20, 2023 at 2:29 PM Almog Gavra  wrote:


+1 (non-binding) - great improvement, thanks Colt & Eduwer!

On Tue, Oct 17, 2023 at 11:25 AM Guozhang Wang <

guozhang.wang...@gmail.com



wrote:


+1 from me.

On Mon, Oct 16, 2023 at 1:56 AM Lucas Brutschy
 wrote:


Hi,

thanks again for the KIP!

+1 (binding)

Cheers,
Lucas



On Sun, Oct 15, 2023 at 9:13 AM Colt McNealy 

wrote:


Hello there,

I'd like to call a vote on KIP-988 (co-authored by my friend and

colleague

Eduwer Camacaro). We are hoping to get it in before the 3.7.0

release.








https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener


Cheers,
Colt McNealy

*Founder, LittleHorse.dev*










Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-26 Thread Matthias J. Sax

Thanks. SGTM.

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy  wrote:


Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a week
of business travel (and a few days with no time to code). I'd like to tie
up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out for a
vote, and b) making sure our English grammar is "correct", let's stick with
'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote network
call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not documented
in the javadoc. As such, I am strongly against modifying the behavior of
endOffsets() on the consumer as some people may implicitly depend on the
contract.
3a. The Consumer#currentLag() method gives us exactly what we want without
a network call (current lag from a cache, from which we can compute the
offset).

4. I have no opinion about whether we should pass endOffset or currentLag
to the callback. Either one has the same exact information inside it. In
the interest of not changing the KIP after the vote has started, I'll leave
it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:


Just want to checkpoint the current state of this KIP and make sure we're
on track to get it in to 3.7 (we still have a few weeks)  -- looks like
there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are onBatchLoaded

and

onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a good
balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable enough

that

we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth is
that English grammar can be sticky and while it could be argued that it

is

the store which is updated, not the batch, I feel that it is perfectly
clear what is meant by "onBatchUpdated" and to me, this doesn't sound

weird

at all. That's just my two cents in case it helps, but again, whatever
makes sense to you Colt is fine

When it comes to #2 -- as much as I would love to dig into the Consumer
client lore and see if we can modify existing APIs or add new ones in

order

to get the desired offset metadata in an efficient way, I think we're
starting to go down a rabbit hole that is going to expand the scope way
beyond what Colt thought he was signing up for. I would advocate to focus
on just the basic feature for now and drop the end-offset from the
callback. Once we have a standby listener it will be easy to expand on

with

a followup KIP if/when we find an efficient way to add additional useful
information. I think it will also become more clear what is and isn't
useful after more people get to using it in the real world

Colt/Eduwer: how necessary is receiving the end offset during a batch
update to your own application use case?

Also, for those who really do need to check the current end offset, I
believe in theory you should be able to use the KafkaStreams#metrics API

to

get the current lag and/or end offset for the changelog -- it's possible
this does not represent the most up-to-date end offset (I'm not sure it
does or does not), but it should be close enough to be reliable and

useful

for the purpose of monitoring -- I mean it is a metric, after all.

Hope this helps -- in the end, it's up to you (Colt) to decide what you
want to bring in scope or not. We still have more than 3 weeks until the
KIP freeze as currently proposed, so in theory you could even implement
this KIP without the end offset and then do a followup KIP to add the end
offset within the same release, ie without any deprecations. There are
plenty of paths forward here, so don't let us drag this out forever if

you

know what you want

Cheers,
Sophie

On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax 

wrote:



Forgot one thing:

We could also pass `currentLag()` into `onBachLoaded()` instead of
end-offset.


-Matthias

On 10/20/23 10:56 AM, Matthias J. Sax wrote:

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything

[jira] [Commented] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM

2023-10-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780015#comment-17780015
 ] 

Matthias J. Sax commented on KAFKA-12550:
-

Hmmm... Aren't both things kinda independent? In the end, as an operator I 
might still be interested to see if the state-updated thread is doing active 
restore (or maintaining standby, or is doing nothing)?

> Introduce RESTORING state to the KafkaStreams FSM
> -
>
> Key: KAFKA-12550
> URL: https://issues.apache.org/jira/browse/KAFKA-12550
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> We should consider adding a new state to the KafkaStreams FSM: RESTORING
> This would cover the time between the completion of a stable rebalance and 
> the completion of restoration across the client. Currently, Streams will 
> report the state during this time as REBALANCING even though it is generally 
> spending much more time restoring than rebalancing in most cases.
> There are a few motivations/benefits behind this idea:
> # Observability is a big one: using the umbrella REBALANCING state to cover 
> all aspects of rebalancing -> task initialization -> restoring has been a 
> common source of confusion in the past. It’s also proved to be a time sink 
> for us, during escalations, incidents, mailing list questions, and bug 
> reports. It often adds latency to escalations in particular as we have to go 
> through GTS and wait for the customer to clarify whether their “Kafka Streams 
> is stuck rebalancing” ticket means that it’s literally rebalancing, or just 
> in the REBALANCING state and actually stuck elsewhere in Streams
> # Prereq for global thread improvements: for example [KIP-406: 
> GlobalStreamThread should honor custom reset policy 
> |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy]
>  was ultimately blocked on this as we needed to pause the Streams app while 
> the global thread restored from the appropriate offset. Since there’s 
> absolutely no rebalancing involved in this case, piggybacking on the 
> REBALANCING state would just be shooting ourselves in the foot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780013#comment-17780013
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

> Ah, nah, ByteBuffer.wrap("test".getBytes()) yields a ByteBuffer with 
> position=0 limit=4; i.e., it's already ready to be read.

Ah, great. For this case, there is not even a backward compatibility concern. – 
Should make it pretty easy to get the KIP approved for this case.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this u

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779216#comment-17779216
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

{quote}I'm not entirely sure I understand Matthias's point regarding example 1,
{quote}
Well, in example one, the position is at 4, and if we say we want to respect 
position the result would be empty – the user would need to rewind to zero 
before calling the serializer to make it work. Does this make sense?

We can discuss details on a KIP, but instead of introducing a new class, I was 
thinking if we should use a config `enable.auto.rewind=true` (by default), that 
users can set to `false` to get the new behavior. For this case, we don't break 
compatibility, and it gives user the ability to add explicit rewind call before 
calling serialize before they change the config to `false`.
{quote}Can you create a Jira to document
{quote}
I think we can re-open K4852 for this purpse?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays 

[jira] [Comment Edited] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778731#comment-17778731
 ] 

Matthias J. Sax edited comment on KAFKA-15602 at 10/23/23 4:20 PM:
---

Btw: also left a comment on K4852: 
https://issues.apache.org/jira/browse/KAFKA-4852?focusedCommentId=17778727=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17778727
 


was (Author: mjsax):
Btw: also left a comment on K4852: 
https://issues.apache.org/jira/browse/KAFKA-4852

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* l

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778731#comment-17778731
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

Btw: also left a comment on K4852: 
https://issues.apache.org/jira/browse/KAFKA-4852

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to 

[jira] [Commented] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778727#comment-17778727
 ] 

Matthias J. Sax commented on KAFKA-4852:


As reported on https://issues.apache.org/jira/browse/KAFKA-15602, this ticket 
introduces a couple of breaking changes and bugs.
 # Semantics changes like this, need to be backed up by a KIP
 # The code does not achieve what it aims to do
 # The is lack of proper unit testing

Given that such a change requires a KIP and the current code is broken, we 
propose to revert this change for the time being, and do a proper fix via a 
KIP. I prepare a PR for this: [https://github.com/apache/kafka/pull/14617]

\cc [~guozhang] 

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Assignee: LinShunkang
>Priority: Minor
> Fix For: 3.4.0
>
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778719#comment-17778719
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

Sounds fair. – Let me revert K4852 in all applicable branches and re-open it 
for a proper KIP. – Guess we can close this ticket afterwards?

 

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading

[jira] [Resolved] (KAFKA-15666) Добавить функцию поиска по почте

2023-10-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15666.
-
Resolution: Invalid

[~noraverba] – We can only take tickets in English. – Also piped the title 
through a translater an it did not really make sense to me, especially as the 
description is empty.

Close as invalid.

> Добавить функцию поиска по почте
> 
>
> Key: KAFKA-15666
> URL: https://issues.apache.org/jira/browse/KAFKA-15666
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Eleonora
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15666) Добавить функцию поиска по почте

2023-10-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15666.
-
Resolution: Invalid

[~noraverba] – We can only take tickets in English. – Also piped the title 
through a translater an it did not really make sense to me, especially as the 
description is empty.

Close as invalid.

> Добавить функцию поиска по почте
> 
>
> Key: KAFKA-15666
> URL: https://issues.apache.org/jira/browse/KAFKA-15666
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Eleonora
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15672:
---

 Summary: Add 3.6 to streams system tests
 Key: KAFKA-15672
 URL: https://issues.apache.org/jira/browse/KAFKA-15672
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15672:
---

 Summary: Add 3.6 to streams system tests
 Key: KAFKA-15672
 URL: https://issues.apache.org/jira/browse/KAFKA-15672
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778511#comment-17778511
 ] 

Matthias J. Sax edited comment on KAFKA-15602 at 10/23/23 4:09 AM:
---

Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a case to fix forward? If the above propose contract 
makes sense, it's rather a bug fix and we should just push it out? (In the end 
KAFKA-4852 was done without a KIP, too – kinda boarder line to begin with...) 

For example (1), it seems it would be a breaking change though? Given that this 
is indeed the "prime" use-case, doing a KIP might indeed be better? (For this 
case, reverting KAFKA-4852 might indeed be a good call, as it breaks more than 
it fixes as it seems.)

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?


was (Author: mjsax):
Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a case to fix forward? If the above propose contract 
makes sense, it's rather a bug fix and we should just push it out? (In the end 
KAFKA-4852 was done without a KIP, too – kinda boarder line to begin with...) 

For example (1), it seems it would be a breaking change though?

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input Byte

[jira] [Comment Edited] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778511#comment-17778511
 ] 

Matthias J. Sax edited comment on KAFKA-15602 at 10/23/23 4:06 AM:
---

Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a case to fix forward? If the above propose contract 
makes sense, it's rather a bug fix and we should just push it out? (In the end 
KAFKA-4852 was done without a KIP, too – kinda boarder line to begin with...) 

For example (1), it seems it would be a breaking change though?

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?


was (Author: mjsax):
Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a simple case to fix forward? Given that the standard 
use case (position = 0 and limit = max) works in all releases (example (1) in 
the ticket description), I am not really worries about introducing a breaking 
change. If the above propose contract makes sense, it's rather a bug fix and we 
should just push it out? (In the end KAFKA-4852 was done without a KIP, too – 
kinda boarder line to begin with...) 

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 

[jira] [Updated] (KAFKA-15664) Add 3.4.0 streams upgrade/compatibility tests

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15664:

Component/s: streams
 system tests

> Add 3.4.0 streams upgrade/compatibility tests
> -
>
> Key: KAFKA-15664
> URL: https://issues.apache.org/jira/browse/KAFKA-15664
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> Per the penultimate bullet on the release checklist, Kafka v3.4.0 is 
> released. We should add this version to the system tests.
> Example PR: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15664) Add 3.4.0 streams upgrade/compatibility tests

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15664:

Priority: Critical  (was: Major)

> Add 3.4.0 streams upgrade/compatibility tests
> -
>
> Key: KAFKA-15664
> URL: https://issues.apache.org/jira/browse/KAFKA-15664
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Critical
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> Per the penultimate bullet on the release checklist, Kafka v3.4.0 is 
> released. We should add this version to the system tests.
> Example PR: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15664) Add 3.4.0 streams upgrade/compatibility tests

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15664:

Affects Version/s: 3.5.0

> Add 3.4.0 streams upgrade/compatibility tests
> -
>
> Key: KAFKA-15664
> URL: https://issues.apache.org/jira/browse/KAFKA-15664
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> Per the penultimate bullet on the release checklist, Kafka v3.4.0 is 
> released. We should add this version to the system tests.
> Example PR: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15664) Add 3.4.0 streams upgrade/compatibility tests

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15664:

Fix Version/s: 3.5.2
   3.7.0
   3.6.1

> Add 3.4.0 streams upgrade/compatibility tests
> -
>
> Key: KAFKA-15664
> URL: https://issues.apache.org/jira/browse/KAFKA-15664
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> Per the penultimate bullet on the release checklist, Kafka v3.4.0 is 
> released. We should add this version to the system tests.
> Example PR: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778511#comment-17778511
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a simple case to fix forward? Given that the standard 
use case (position = 0 and limit = max) works in all releases (example (1) in 
the ticket description), I am not really worries about introducing a breaking 
change. If the above propose contract makes sense, it's rather a bug fix and we 
should just push it out? (In the end KAFKA-4852 was done without a KIP, too – 
kinda boarder line to begin with...) 

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning bette

[jira] [Updated] (KAFKA-15662) Implement support for clientInstanceIds in Kafka Stream

2023-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15662:

Component/s: streams

> Implement support for clientInstanceIds in Kafka Stream
> ---
>
> Key: KAFKA-15662
> URL: https://issues.apache.org/jira/browse/KAFKA-15662
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Apoorv Mittal
>        Assignee: Matthias J. Sax
>Priority: Major
>
> The KIP requires Kafka Stream to support below method to give access to the 
> client instance ids of the producers, consumers and admin clients used by 
> Kafka Streams.
>  
> This method is only permitted when Kafka Streams is in state RUNNING or 
> REBALANCING. In the event that Kafka Streams is not in state RUNNING or 
> REBALANCING, the method throws 
> {{org.apache.kafka.streams.errors.StreamsNotRunningException}} , which is a 
> new subclass of {{InvalidStateStoreException}} .
>  
> {code:java}
> public ClientInstanceIds clientInstanceIds(Duration timeout); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15378.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
   Resolution: Fixed

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>    Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15378.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
   Resolution: Fixed

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>    Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.5.2 release

2023-10-20 Thread Matthias J. Sax

Thanks for the info Luke.

We did backport all but one PR in the mean time. The missing PR is a 
RocksDB version bump. We want to consider it for 3.5.2, because it 
addresses a CVE.


Cf https://github.com/apache/kafka/pull/14216

However, RocksDB versions bumps are a little bit more tricky, and we 
would like to test this properly on 3.5 branch, what would take at least 
one week; we could do the cherry-pick on Monday and start testing.


Please let us know if such a delay for 3.5.2 is acceptable or not.

Thanks.

-Matthias


On 10/20/23 5:44 AM, Luke Chen wrote:

Hi Ryan,

OK, I've backported it to 3.5 branch.
I'll be included in v3.5.2.

Thanks.
Luke

On Fri, Oct 20, 2023 at 7:43 AM Ryan Leslie (BLP/ NEW YORK (REMOT) <
rles...@bloomberg.net> wrote:


Hi Luke,

Hope you are well. Can you please include
https://issues.apache.org/jira/browse/KAFKA-15106 in 3.5.2?

Thanks,

Ryan

From: dev@kafka.apache.org At: 10/17/23 05:05:24 UTC-4:00
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] Apache Kafka 3.5.2 release

Thanks Luke for volunteering for 3.5.2 release.

On Tue, 17 Oct 2023 at 11:58, Josep Prat 
wrote:


Hi Luke,

Thanks for taking this one!

Best,

On Tue, Oct 17, 2023 at 8:12 AM Luke Chen  wrote:


Hi all,

I'd like to volunteer as release manager for the Apache Kafka 3.5.2, to
have an important bug/vulnerability fix release for 3.5.1.

If there are no objections, I'll start building a release plan in

thewiki

in the next couple of weeks.

Thanks,
Luke




--
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io | +491715557497
aiven.io  | 
 
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B








Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-20 Thread Matthias J. Sax

Forgot one thing:

We could also pass `currentLag()` into `onBachLoaded()` instead of 
end-offset.



-Matthias

On 10/20/23 10:56 AM, Matthias J. Sax wrote:

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything specific about 
`endOffset` guarantees:


Get the end offsets for the given partitions. In the default {@code 
read_uncommitted} isolation level, the end
offset is the high watermark (that is, the offset of the last 
successfully replicated message plus one). For
{@code read_committed} consumers, the end offset is the last stable 
offset (LSO), which is the minimum of
the high watermark and the smallest offset of any open transaction. 
Finally, if the partition has never been

written to, the end offset is 0.


Thus, I actually believe that it would be ok to change the 
implementation and serve the answer from the `TopicPartitionState`?


Another idea would be, to use `currentLag()` in combination with 
`position()` (or the offset of the last read record) to compute the 
end-offset of the fly?



-Matthias

On 10/20/23 4:00 AM, Bruno Cadonna wrote:

Hi,

Matthias is correct that the end offsets are stored somewhere in the 
metadata of the consumer. More precisely, they are stored in the 
`TopicPartitionState`. However, I could not find public API on the 
consumer other than currentLag() that uses the stored end offsets. If 
I understand the code correctly, method endOffSets() always triggers a 
remote call.


I am a bit concerned about doing remote calls every commit.interval.ms 
(by default 200ms under EOS). At the moment the remote calls are only 
issued if an optimization for KTables is turned on where changelog 
topics are replaced with the input topic of the KTable. The current 
remote calls retrieve all committed offsets of the group at once. If I 
understand correctly, that is one single remote call. Remote calls for 
getting end offsets of changelog topics -- as I understand you are 
planning to issue -- will probably result in multiple remote calls to 
multiple leaders of the changelog topic partitions.


Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in 
such a way to get the end offset from the locally stored metadata 
whenever possible as part of the implementation of this KIP. I do not 
know what the implications are of such a change of the consumer and if 
a KIP is needed for it. Maybe, endOffsets() guarantees to return the 
freshest end offsets possible, which would not be satisfied with the 
modification.


Regarding the naming, I do not completely agree with Matthias. While 
the pattern might be consistent with onBatchUpdated, what is the 
meaning of onBatchUpdated? Is the batch updated? The names 
onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO.
With "restore" the pattern works better. If I restore a batch of 
records in a state, the records are not there although they should be 
there and I add them. If I update a batch of records in a state. This 
sounds like the batch of records is in the state and I modify the 
existing records within the state. That is clearly not the meaning of 
the event for which the listener should be called.


Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
  - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch 
response. (We exploit this behavior to track end-offsets for input 
topic with regard to `max.task.idle.ms` without overhead -- it was 
also a concern when we did the corresponding KIP how we could track 
lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly 
to get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call 
using
the admin client in order to know this "endOffset" and that will 
have an
impact on performance. We can either find a solution that has a low 
impact
on performance or ideally zero impact; unfortunately, I don't see a 
way to

have zero impact 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-20 Thread Matthias J. Sax

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything specific about 
`endOffset` guarantees:



Get the end offsets for the given partitions. In the default {@code 
read_uncommitted} isolation level, the end
offset is the high watermark (that is, the offset of the last successfully 
replicated message plus one). For
{@code read_committed} consumers, the end offset is the last stable offset 
(LSO), which is the minimum of
the high watermark and the smallest offset of any open transaction. Finally, if 
the partition has never been
written to, the end offset is 0.


Thus, I actually believe that it would be ok to change the 
implementation and serve the answer from the `TopicPartitionState`?


Another idea would be, to use `currentLag()` in combination with 
`position()` (or the offset of the last read record) to compute the 
end-offset of the fly?



-Matthias

On 10/20/23 4:00 AM, Bruno Cadonna wrote:

Hi,

Matthias is correct that the end offsets are stored somewhere in the 
metadata of the consumer. More precisely, they are stored in the 
`TopicPartitionState`. However, I could not find public API on the 
consumer other than currentLag() that uses the stored end offsets. If I 
understand the code correctly, method endOffSets() always triggers a 
remote call.


I am a bit concerned about doing remote calls every commit.interval.ms 
(by default 200ms under EOS). At the moment the remote calls are only 
issued if an optimization for KTables is turned on where changelog 
topics are replaced with the input topic of the KTable. The current 
remote calls retrieve all committed offsets of the group at once. If I 
understand correctly, that is one single remote call. Remote calls for 
getting end offsets of changelog topics -- as I understand you are 
planning to issue -- will probably result in multiple remote calls to 
multiple leaders of the changelog topic partitions.


Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in such 
a way to get the end offset from the locally stored metadata whenever 
possible as part of the implementation of this KIP. I do not know what 
the implications are of such a change of the consumer and if a KIP is 
needed for it. Maybe, endOffsets() guarantees to return the freshest end 
offsets possible, which would not be satisfied with the modification.


Regarding the naming, I do not completely agree with Matthias. While the 
pattern might be consistent with onBatchUpdated, what is the meaning of 
onBatchUpdated? Is the batch updated? The names onBatchLoaded or 
onBatchWritten or onBatchAdded are more clear IMO.
With "restore" the pattern works better. If I restore a batch of records 
in a state, the records are not there although they should be there and 
I add them. If I update a batch of records in a state. This sounds like 
the batch of records is in the state and I modify the existing records 
within the state. That is clearly not the meaning of the event for which 
the listener should be called.


Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
  - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch 
response. (We exploit this behavior to track end-offsets for input 
topic with regard to `max.task.idle.ms` without overhead -- it was 
also a concern when we did the corresponding KIP how we could track 
lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly to 
get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low 
impact
on performance or ideally zero impact; unfortunately, I don't see a 
way to

have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the 
admin
client to ask for these "endOf

Re: [DISCUSS] KIP-992 Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-10-20 Thread Matthias J. Sax

Thanks for the KIP Hanyu,

One questions:


To address this inconsistency, we propose that KeyQuery  should be restricted 
to querying kv-stores  only, ensuring that it always returns a plain V  type, 
making the behavior of the aforementioned code more predictable. Similarly, 
RangeQuery  should be dedicated to querying kv-stores , consistently returning 
only the plain V .


Why do you want to restrict `KeyQuery` and `RangeQuery` to kv-stores? I 
think it would be possible to still allow both queries for ts-kv-stores, 
but change the implementation to return "plain V" instead of 
`ValueAndTimestamp`, ie, the implementation would automatically 
unwrap the value.




-Matthias

On 10/20/23 2:32 AM, Alieh Saeedi wrote:

Hey Hanyu,

Thanks for the KIP. It seems good to me.
Just one point: AFAIK, we are going to remove "get" from the name of all
getter methods.

Cheers,
Alieh

On Thu, Oct 19, 2023 at 5:44 PM Hanyu (Peter) Zheng
 wrote:


Hello everyone,

I would like to start the discussion for KIP-992: Proposal to introduce
IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

The KIP can be found here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery

Any suggestions are more than welcome.

Many thanks,
Hanyu

On Thu, Oct 19, 2023 at 8:17 AM Hanyu (Peter) Zheng 
wrote:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery


--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]
<

https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic






--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<
https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]
<
https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic








[jira] [Updated] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-10-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13973:

Fix Version/s: 3.5.2

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7699) Improve wall-clock time punctuations

2023-10-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1550#comment-1550
 ] 

Matthias J. Sax commented on KAFKA-7699:


Happy to support you. The KIP wiki page describes how it works: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 
– if you have any questions about it, happy to answer them.

> Improve wall-clock time punctuations
> 
>
> Key: KAFKA-7699
> URL: https://issues.apache.org/jira/browse/KAFKA-7699
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, wall-clock time punctuation allow to schedule periodic call backs 
> based on wall-clock time progress. The punctuation time starts, when the 
> punctuation is scheduled, thus, it's non-deterministic what is desired for 
> many use cases (I want a call-back in 5 minutes from "now").
> It would be a nice improvement, to allow users to "anchor" wall-clock 
> punctation, too, similar to a cron job: Thus, a punctuation would be 
> triggered at "fixed" times like the beginning of the next hour, independent 
> when the punctuation was registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2305

2023-10-19 Thread Matthias J. Sax

You would need to unsubscribe from the dev list.

I would recommend to setup a filter with you email provider if you don't 
want these and re-direct them directly to trash.



-Matthias

On 10/19/23 4:49 AM, Shyam P wrote:

how to unsubscribe this ?

On Thu, Oct 19, 2023 at 1:30 PM Apache Jenkins Server <
jenk...@builds.apache.org> wrote:


See <
https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2305/display/redirect









[jira] [Commented] (KAFKA-7699) Improve wall-clock time punctuations

2023-10-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1368#comment-1368
 ] 

Matthias J. Sax commented on KAFKA-7699:


[~hermankj] – thanks for your comment. – This change would require a KIP but 
should not be too difficult to do. Would you have interest to pick it up?

> Improve wall-clock time punctuations
> 
>
> Key: KAFKA-7699
> URL: https://issues.apache.org/jira/browse/KAFKA-7699
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, wall-clock time punctuation allow to schedule periodic call backs 
> based on wall-clock time progress. The punctuation time starts, when the 
> punctuation is scheduled, thus, it's non-deterministic what is desired for 
> many use cases (I want a call-back in 5 minutes from "now").
> It would be a nice improvement, to allow users to "anchor" wall-clock 
> punctation, too, similar to a cron job: Thus, a punctuation would be 
> triggered at "fixed" times like the beginning of the next hour, independent 
> when the punctuation was registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15616) Define client telemetry states and their transitions

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15616:

Fix Version/s: 3.7.0

> Define client telemetry states and their transitions
> 
>
> Key: KAFKA-15616
> URL: https://issues.apache.org/jira/browse/KAFKA-15616
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> The client emitting metrics to broker needs to maintain states which 
> specifies what next action client should take i.e. request subscriptions, 
> push telemetry, etc.
>  
> The changes should include comprehensive definition of all states a client 
> can move into and their transitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13973:

Fix Version/s: 3.6.1

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Fix For: 3.7.0, 3.6.1
>
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15338:

Affects Version/s: (was: 3.4.0)
   (was: 3.3.1)
   (was: 3.3.2)
   (was: 3.5.0)
   (was: 3.4.1)
   (was: 3.5.1)

> The metric group documentation for metrics added in KAFKA-13945 is incorrect
> 
>
> Key: KAFKA-15338
> URL: https://issues.apache.org/jira/browse/KAFKA-15338
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Neil Buesing
>Assignee: Atul Sharma
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> ops.html (docs/streams/ops.html) incorrectly states that the metrics type is 
> "stream-processor-node-metrics", but in looking at the metrics and inspecting 
> the code in TopicMetrics, these metrics have a type of "stream-topic-metrics".
> 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", 
> "records-consumed-total", and "records-produced-total".
> Looks like the type was changed from the KIP, and the documentation still 
> reflects the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15338) The metric group documentation for metrics added in KAFKA-13945 is incorrect

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15338:

Fix Version/s: 3.3.3
   3.4.2
   3.5.2

> The metric group documentation for metrics added in KAFKA-13945 is incorrect
> 
>
> Key: KAFKA-15338
> URL: https://issues.apache.org/jira/browse/KAFKA-15338
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.5.1
>Reporter: Neil Buesing
>Assignee: Atul Sharma
>Priority: Trivial
>  Labels: beginner, newbie
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> ops.html (docs/streams/ops.html) incorrectly states that the metrics type is 
> "stream-processor-node-metrics", but in looking at the metrics and inspecting 
> the code in TopicMetrics, these metrics have a type of "stream-topic-metrics".
> 4 metrics are in error "bytes-consumed-total", "bytes-produced-total", 
> "records-consumed-total", and "records-produced-total".
> Looks like the type was changed from the KIP, and the documentation still 
> reflects the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15629:

Description: 
KIP-992: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery]

In the current IQv2 code, there are noticeable differences when interfacing 
with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
simple value for plain-kv-store but evolves into ValueAndTimestamp for 
ts-kv-store, which presents type safety issues in the API.

Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
compatibility concerns.

This brings us to the essence of our proposal: the introduction of distinct 
query types. One that returns a plain value, another for values accompanied by 
timestamps.

While querying a ts-kv-store for a plain value and then extracting it is 
feasible, it doesn't make sense to query a plain-kv-store for a 
ValueAndTimestamp.

Our vision is for plain-kv-store to always return V, while ts-kv-store should 
return ValueAndTimestamp.

  was:
In the current IQv2 code, there are noticeable differences when interfacing 
with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
simple value for plain-kv-store but evolves into ValueAndTimestamp for 
ts-kv-store, which presents type safety issues in the API.

Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
compatibility concerns.

This brings us to the essence of our proposal: the introduction of distinct 
query types. One that returns a plain value, another for values accompanied by 
timestamps.

While querying a ts-kv-store for a plain value and then extracting it is 
feasible, it doesn't make sense to query a plain-kv-store for a 
ValueAndTimestamp.

Our vision is for plain-kv-store to always return V, while ts-kv-store should 
return ValueAndTimestamp.


> proposal to introduce IQv2 Query Types: TimeStampKeyQuery and 
> TimeStampRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> KIP-992: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampKeyQuery+and+TimestampRangeQuery]
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimeStampKeyQuery and TimeStampRangeQuery

2023-10-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15629:

Labels: kip  (was: )

> proposal to introduce IQv2 Query Types: TimeStampKeyQuery and 
> TimeStampRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-18 Thread Matthias J. Sax

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


 - it's an update-listener, not loaded-listener
 - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
 - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch response. 
(We exploit this behavior to track end-offsets for input topic with 
regard to `max.task.idle.ms` without overhead -- it was also a concern 
when we did the corresponding KIP how we could track lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly to 
get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low impact
on performance or ideally zero impact; unfortunately, I don't see a way to
have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin
client to ask for these "endOffset"s. As far I can understand, this update
is done periodically using the "commit.interval.ms" configuration. I
believe this option will force us to invoke StandbyUpdateLister once this
interval is reached.

On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna  wrote:


Thanks for the KIP, Colt and Eduwer,

Are you sure there is also not a significant performance impact for
passing into the callback `currentEndOffset`?

I am asking because the comment here:

https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129

says that the end-offset is only updated once for standby tasks whose
changelog topic is not piggy-backed on input topics. I could also not
find the update of end-offset for those standbys.


Best,
Bruno

On 10/16/23 10:55 AM, Lucas Brutschy wrote:

Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came here to say that it seems to me consensus
has been reached and the result looks good to me.

Thanks Colt and Eduwer!
Lucas

On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy 

wrote:


Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy 

wrote:


Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth

going

on,

so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do

similarly

for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well,

mostly

Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <

edu...@littlehorse.io>

wrote:


Hello everyone,

Thanks for all your feedback for this KIP!

I think that the key to choosing proper names for this API is

understanding

the terms used inside the StoreChangelogReader. Currently, this class

has

two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my

opinion,

using StandbyUpdateListener for the interface fits better on these

terms.

Same applies for onUpdateStart/Suspended.

StoreChangelogReader uses "the same mechanism" for active task

restoration

and standby task updates, but this is an implementation detail. Under
normal circumstances (no rebalances or task migrations), the

changelog

reader will be in STANDBY_UPDATING, which means it will be updating

standby

tasks as long as there are new records in the changelog topic. That's

why I

prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't

100%

align with StateRestoreListener, but either one is fine.

Edu

On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Hello Colt,

Thanks for writing the KIP! I have read through the updated KIP and

[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2023-10-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776381#comment-17776381
 ] 

Matthias J. Sax commented on KAFKA-6520:


Could we use Admin.describeCluster() which returns the list of brokers that are 
currently online, and go into DISCONNECTED if we don't get anything back?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.5.2 release

2023-10-17 Thread Matthias J. Sax
Thanks -- there is a few fixed for Kafka Streams we are considering to 
cherry-pick to get into 3.5.2 release -- what timeline do you target for 
the release?



-Matthias

On 10/17/23 8:47 AM, Divij Vaidya wrote:

Thank you for volunteering Luke.

--
Divij Vaidya



On Tue, Oct 17, 2023 at 3:26 PM Bill Bejeck  wrote:


Thanks for driving the release, Luke.

+1
-Bill

On Tue, Oct 17, 2023 at 5:05 AM Satish Duggana 
wrote:


Thanks Luke for volunteering for 3.5.2 release.

On Tue, 17 Oct 2023 at 11:58, Josep Prat 
wrote:


Hi Luke,

Thanks for taking this one!

Best,

On Tue, Oct 17, 2023 at 8:12 AM Luke Chen  wrote:


Hi all,

I'd like to volunteer as release manager for the Apache Kafka 3.5.2,

to

have an important bug/vulnerability fix release for 3.5.1.

If there are no objections, I'll start building a release plan in

thewiki

in the next couple of weeks.

Thanks,
Luke




--
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io    |   <

https://www.facebook.com/aivencloud>

      <

https://twitter.com/aiven_io>

*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B








Re: [VOTE] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-16 Thread Matthias J. Sax

+1 (binding)


On 10/13/23 9:24 AM, Hanyu (Peter) Zheng wrote:

Hello everyone,

I would like to start a vote for KIP-985 that Add reverseRange and
reverseAll query over kv-store in IQv2.

Sincerely,
Hanyu

On Fri, Oct 13, 2023 at 9:15 AM Hanyu (Peter) Zheng 
wrote:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-985:+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2

--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]







[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775325#comment-17775325
 ] 

Matthias J. Sax commented on KAFKA-13152:
-

[~guozhang] would you have interest to help getting this into 3.7? It keeps 
slipping... :(

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775323#comment-17775323
 ] 

Matthias J. Sax commented on KAFKA-15594:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775324#comment-17775324
 ] 

Matthias J. Sax commented on KAFKA-15594:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15593) Add 3.6.0 to broker/client upgrade/compatibility tests

2023-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775322#comment-17775322
 ] 

Matthias J. Sax commented on KAFKA-15593:
-

This ticket is to include upgrade test from 3.6 to 3.7/trunk – can only be done 
after 3.6 is released – it's WIP.

> Add 3.6.0 to broker/client upgrade/compatibility tests
> --
>
> Key: KAFKA-15593
> URL: https://issues.apache.org/jira/browse/KAFKA-15593
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Requesting permission for contributions

2023-10-13 Thread Matthias J. Sax

Done. You should be all set.

-Matthias

On 10/13/23 8:21 AM, Apoorv Mittal wrote:

Hi,
Can I please get permission to contribute KIP and assign Jiras to myself.

Wiki and Jira Id: apoorvmittal10
Email: apoorvmitta...@gmail.com

Regards,
Apoorv Mittal
+44 7721681581



Re: [DISCUSS] 3.5.2 Release

2023-10-13 Thread Matthias J. Sax
Thanks -- there is a few fixed for Kafka Streams we are considering to 
cherry-pick to get into 3.5.2 release -- can you give us a few more days 
for this?


-Matthias

On 10/12/23 6:20 PM, Sophie Blee-Goldman wrote:

Thanks for volunteering Luke!

On Thu, Oct 12, 2023 at 2:55 AM Levani Kokhreidze 
wrote:


Hi Divij,

Thanks for the explanation, makes sense.

Hi Luke, thanks you! It would be awesome to see 3.5.2 out.

Best,
Levani


On 12. Oct 2023, at 12:39, Luke Chen  wrote:

Hi Levani and Divij,

I can work on the 3.5.2 release.
I'll start a new thread for volunteering it maybe next week.

Thanks.
Luke

On Thu, Oct 12, 2023 at 5:07 PM Divij Vaidya 
wrote:


Hello Levani

 From a process perspective, there is no fixed schedule for bug fix
releases. If we have a volunteer for release manager (must be a

committer),

they can start with the process of bug fix release (with the approval of
PMC).

My personal opinion is that it's too early to start 3.6.1 and we should
wait at least 1 months to hear feedback on 3.6.0. We need to make a

careful

balance between getting the critical fixes in the hands of users as soon
as possible vs. spending community effort towards releases (the effort

that

could be used to make Kafka better, feature-wise & operational
stability-wise, otherwise).

For 3.5.2, I think there are sufficient pending (including some CVE

fixes)

to start a bug fix release. We just need a volunteer for the release
manager.

--
Divij Vaidya



On Thu, Oct 12, 2023 at 9:57 AM Levani Kokhreidze <

levani.co...@gmail.com>

wrote:


Hello,

KAFKA-15571 [1] was merged and backported to the 3.5 and 3.6 branches.

Bug

fixes the feature that was added in 3.5. Considering the feature

doesn't

work as expected without a fix, I would like to know if it's reasonable

to

start the 3.5.2 release. Of course, releasing such a massive project

like

Kafka is not a trivial task, and I am looking for the community's input

on

this if it's reasonable to start the 3.5.2 release process.

Best,
Levani

[1] - https://issues.apache.org/jira/browse/KAFKA-15571









Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-13 Thread Matthias J. Sax

Thanks for pointing this out Alieh! I totally missed this.

So I guess everything is settled and Hanyu can start a VOTE?

For the KIP PR, we should ensure to update the JavaDocs to avoid 
confusion in the future.



-Matthias

On 10/12/23 12:21 PM, Alieh Saeedi wrote:

Hi,
just pointing to javadocs for range() and reverseRange():

range(): *@return The iterator for this range, from smallest to largest
bytes.*
reverseRange(): * @return The reverse iterator for this range, from largest
to smallest key bytes.

Cheers,
Alieh


On Thu, Oct 12, 2023 at 7:32 AM Matthias J. Sax  wrote:


Quick addendum.

Some DSL operator use `range` and seems to rely on ascending order,
too... Of course, if we say `range()` has no ordering guarantee, we
would add `forwardRange()` and let the DSL use `forwardRange`, too.

The discussion of course also applies to `all()` and `reveserAll()`, and
and I assume also `prefixScan()` (even if there is no "reverse" version
for it).


On 10/11/23 10:22 PM, Matthias J. Sax wrote:

Thanks for raising this question Hanyu. Great find!

My interpretation is as follows (it's actually a warning signal that the
API contract is not better defined, and we should fix this by extending
JavaDocs and docs on the web page about it).

We have existing `range()` and `reverseRange()` methods on
`ReadOnlyKeyValueStore` -- the interface itself is not typed (ie, just
generics), and we state that we don't guarantee "logical order" because
underlying stores are based on `byte[]` type. So far so... well.

However, to make matters worse, we are also not explicit if the
underlying store implementation *must* return keys is
byte[]-lexicographical order or not...

For `range()`, I would be kinda willing to accept that there is no
ordering guarantee at all -- for example, if the underlying byte[]-store
is hash-based and implements a full scan to answer a `range()` it might
not be efficient, but also not incorrect if keys are be returned in some
"random" (byte[]-)order. In isolation, I don't see an API contract
violation.

However, `reverseRange` implicitly states with its name, that some
"descending order" (base on keys) is expected. Given the JavaDoc comment
about "logical" vs "byte[]" order, the contract (at least to me) is
clear: returns records in descending byte[]-lexicographical order. --
Any other interpretation seems to be off? Curious to hear if you agree
or disagree to this interpretation?



If this is correct, it means we are actually lacking a API contract for
ascending byte[]-lexicographical range scan. Furthermore, a hash-based
byte[]-store would need to actually explicitly sort it's result for
`reverseRange` to not violate the contract.

To me, this raises the question if `range()` actually has a
(non-explicit) contract about returning data in byte[]-lexicographical
order? It seems a lot of people rely on this, and our default stores
actually implement it this way. So if we don't look at `range()` in
isolation, but look at the `ReadOnlyKeyValueStore` interface
holistically, I would also buy the argument that `range()` implies
"ascending "byte[]-lexicographical order". Thoughts?

To be frank: to me, it's pretty clear that the original idea to add
`range()` was to return data in ascending order.


Question 1:
   - Do we believe that the range() contract is ascending
byte[]-lexicographical order right now?

 If yes, I would propose to make it explicit in the JavaDocs.

 If no, I would also propose to make it explicit in the JavaDocs. In
addition, it raises the question if a method `forwardRange()` (for the
lack of a better idea about a name right now) is actually missing to
provide such a contract?


Of course, we always depend on the serialization format for order, and
if users need "logical order" they need to ensure to use a serialization
format that align byte[]-lexicographical order to logical order. But for
the scope of this work, I would not even try to open this can of worms...




Looking into `RangeQuery` the JavaDocs don't say anything about order.
Thus, `RangeQuery#range()` could actually also be implemented by calling
`reverseRange()` without violating the contract as it seems. A hash-base
store could also implement it, without the need to explicitly sort...

What brings be back to my original though about having three types of
results for `Range`
   - no ordering guarantee
   - ascending (we would only give byte[]-lexicographical order)
   - descending (we would only give byte[]-lexicographical order)

Again, I actually believe that the original intent of RangeQuery was to
inherit the ascending order of `ReadOnlyKeyValueStore#range()`... Please
keep me honest about it.  On the other hand, both APIs seems to be
independent enough to not couple them... -- this could actually be a
step into the right direction and would follow the underlying idea of
IQv2 to begin with: decouple semantics

[jira] [Updated] (KAFKA-15601) Client metrics and observability

2023-10-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15601:

Component/s: streams

> Client metrics and observability
> 
>
> Key: KAFKA-15601
> URL: https://issues.apache.org/jira/browse/KAFKA-15601
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, core, producer , streams
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: kip
>
> This Jira tracks the development of KIP-714: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15601) Client metrics and observability

2023-10-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15601:

Component/s: admin
 clients
 consumer
 core
 producer 

> Client metrics and observability
> 
>
> Key: KAFKA-15601
> URL: https://issues.apache.org/jira/browse/KAFKA-15601
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, core, producer 
>Reporter: Apoorv Mittal
>Priority: Major
>
> This Jira tracks the development of KIP-714: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15601) Client metrics and observability

2023-10-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15601:

Labels: kip  (was: )

> Client metrics and observability
> 
>
> Key: KAFKA-15601
> URL: https://issues.apache.org/jira/browse/KAFKA-15601
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, consumer, core, producer 
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: kip
>
> This Jira tracks the development of KIP-714: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax
Thanks Andrew. Makes sense to me. Adding the parameter-less overload was 
just a random idea. No need to extend the KIP.



-Matthias

On 10/12/23 12:12 PM, Jun Rao wrote:

Hi, Andrew,

Thanks for the reply.

131. Could we also document how one could correlate each client instance in
KStreams with the labels for the metrics received by the brokers?

132. The documentation for RequestsPerSec is not complete. If you trace
through how
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L71
<https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L579>
is
implemented, it includes every API key tagged with the corresponding
listener.

Jun

On Thu, Oct 12, 2023 at 11:42 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Jun,
Thanks for your comments.

130. As Matthias described, and I am adding to the KIP, the
`KafkaStreams#clientInstanceIds` method
is only permitted when the state is RUNNING or REBALANCING. Also, clients
can be added dynamically
so the maps might change over time. If it’s in a permitted state, the
method is prepared to wait up to the
supplied timeout to get the client instance ids. It does not return a
partial result - it returns a result or
fails.

131. I’ve refactored the `ClientsInstanceIds` object and the global
consumer is now part of the map
of consumers. There is no need for the Optional any longer. I’ve also
renamed it `ClientInstanceIds`.

132. My reading of
`(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*)` is that
It does not support every request type - it supports Produce,
FetchConsumer and FetchFollower.
Consequently, I think the ClientMetricsSubscriptionRequestCount is not
instantly obsolete.

If I’ve misunderstood, please let me know.

Thanks,
Andrew



On 12 Oct 2023, at 01:07, Jun Rao  wrote:

Hi, Andrew,

Thanks for the updated KIP. Just a few more minor comments.

130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for

all

consumer/producer/adminClient instances to be initialized? Are all those
instances created during KafkaStreams initialization?

131. Why does globalConsumerInstanceId() return Optional while
other consumer instances don't return Optional?

132. ClientMetricsSubscriptionRequestCount: Do we need this since we

have a

set of generic metrics
(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
report Request rate for every request type?

Thanks,

Jun

On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax 

wrote:



Thanks!

On 10/10/23 11:31 PM, Andrew Schofield wrote:

Matthias,
Yes, I think that’s a sensible way forward and the interface you

propose

looks good. I’ll update the KIP accordingly.


Thanks,
Andrew


On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to

be

important, as we don't know if/when a follow-up KIP for Kafka Streams

would

land.


I was also thinking (and discussed with a few others) how to expose

it,

and we would propose the following:


We add a new method to `KafkaStreams` class:

public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

  public class ClientsInstanceIds {
// we only have a single admin client per KS instance
String adminInstanceId();

// we only have a single global consumer per KS instance (if any)
// Optional<> because we might not have global-thread
Optional globalConsumerInstanceId();

// return a  ClientInstanceId> mapping
// for the underlying (restore-)consumers/producers
Map mainConsumerInstanceIds();
Map restoreConsumerInstanceIds();
Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

  [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matthias,
Good point. Makes sense to me.
Is this something that can also be included in the proposed Kafka

Streams follow-on KIP, or would you prefer that I add it to KIP-714?

I have a slight preference for the former to put all of the KS

enhancements into a separate KIP.

Thanks,
Andrew

On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to

`KafkaStreams` similar to the proposed `clientInstanceId()` that will be
added to consumer/producer/admin clients.


Without addressing this, Kafka Streams users won't have a way to get

the assigned `instanceId` of the internally created clients, and thus it
would be very difficult for them to know which metrics that the broker
receives belong to a Kafka Streams app. It seems they would only find

the

`instanceIds` in the log4j output if they enable client logging?


Of course, because there is multiple clients inside Kafka Streams,

the return type cannot be an single "String", but must be som

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax

Seems both Andrew and Jun prefer to merge the consumers. I am ok with this.

I'll leave it to Andrew to update the KIP accordingly, including adding 
`throws TimeoutException`.



-Matthias

On 10/12/23 10:07 AM, Jun Rao wrote:

Hi, Matthias,

130. Yes, throwing an exception sounds reasonable. It would be useful to
document this.

131. I was thinking that we could just return all consumers (including the
global consumer) through Map consumerInstanceIds() and use
keys to identify each consumer instance. The benefit is that the
implementation (whether to use a separate global consumer or not) could
change in the future, but the API can remain the same. Another slight
benefit is that there is no need for returning Optional. If the
global consumer is not used, it just won't be included in the map.

Thanks,

Jun


On Thu, Oct 12, 2023 at 9:30 AM Matthias J. Sax  wrote:


Thanks Sophie and Jun.

`clientInstanceIds()` is fine with me -- was not sure about the double
plural myself.

Sorry if my comments was confusing. I was trying to say, that adding a
overload to `KafkaStreams` that does not take a timeout parameter does
not make sense, because there is no `default.api.timeout.ms` config for
Kafka Streams, so users always need to pass in a timeout. (Same for
producer.)

For the implementation, I think KS would always call
`client.clientInstanceId(timeout)` and never rely on
`default.api.timeout.ms` though, so we can stay in control -- if a
timeout is passed by the user, it would always overwrite
`default.api.timeout.ms` on the consumer/admin and thus we should follow
the same semantics in Kafka Streams, and overwrite it explicitly when
calling `client.clientInstanceId()`.

The proposed API also makes sense to me. I was just wondering if we want
to extend it for client users -- for KS we won't need/use the
timeout-less overloads.



130) My intent was to throw a TimeoutException if we cannot get all
instanceIds, because it's the standard contract for timeouts. It would
also be hard to tell for a user, if a full or partial result was
returned (or we add a method `boolean isPartialResult()` to make it
easier for users).

If there is concerns/objections, I am also ok to return a partial result
-- it would require a change to the newly added `ClientInstanceIds`
return type -- for `adminInstanceId` we only return a `String` right now
-- we might need to change this to `Optional` so we are able to
return a partial result?


131) Of course we could, but I am not sure what we would gain? In the
end, implementation details would always leak because if we change the
number of consumer we use, we would return different keys in the `Map`.
Atm, the proposal implies that the same key might be used for the "main"
and "restore" consumer of the same thread -- but we can make keys unique
by adding a `-restore` suffix to the restore-consumer key if we merge
both maps. -- Curious to hear what others think. I am very open to do it
differently than currently proposed.


-Matthias


On 10/12/23 8:39 AM, Jun Rao wrote:

Hi, Matthias,

Thanks for the reply.

130. What would be the semantic? If the timeout has expired and only some
of the client instances' id have been retrieved, does the call return the
partial result or throw an exception?

131. Could we group all consumer instances in a single method since we

are

returning the key for each instance already? This probably also avoids
exposing implementation details that could change over time.

Thanks,

Jun

On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Regarding the naming, I personally think `clientInstanceId` makes sense

for

the plain clients
   -- especially if we might later introduce the notion of an
`applicationInstanceId`.

I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
though, can we use
`clientInstanceIds` instead? (The difference being the placement of the
plural 's')
I would similarly rename the class to just ClientInstanceIds

we can also not have a timeout-less overload,  because `KafkaStreams`

does

not have a `default.api.timeout.ms` config either


With respect to the timeout for the Kafka Streams API, I'm a bit

confused

by the
doubletriple-negative of Matthias' comment here, but I was thinking

about

this
earlier and this was my take: with the current proposal, we would allow
users to pass
in an absolute timeout as a parameter that would apply to the method as

a

whole.
Meanwhile within the method we would issue separate calls to each of the
clients using
the default or user-configured value of their  `default.api.timeout.ms`

as

the timeout
parameter.

So the API as proposed makes sense to me.


On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax 

wrote:



In can answer 130 and 131.

130) We cannot guarantee that all clients are already initialized due

to

race conditions. We plan to not allow calling
`KafkaStreams#clientsInstanceIds()` when the state is not RUNNI

[jira] [Updated] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15594:

Component/s: streams
 system tests

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-12 Thread Matthias J. Sax

Thanks Sophie and Jun.

`clientInstanceIds()` is fine with me -- was not sure about the double 
plural myself.


Sorry if my comments was confusing. I was trying to say, that adding a 
overload to `KafkaStreams` that does not take a timeout parameter does 
not make sense, because there is no `default.api.timeout.ms` config for 
Kafka Streams, so users always need to pass in a timeout. (Same for 
producer.)


For the implementation, I think KS would always call 
`client.clientInstanceId(timeout)` and never rely on 
`default.api.timeout.ms` though, so we can stay in control -- if a 
timeout is passed by the user, it would always overwrite 
`default.api.timeout.ms` on the consumer/admin and thus we should follow 
the same semantics in Kafka Streams, and overwrite it explicitly when 
calling `client.clientInstanceId()`.


The proposed API also makes sense to me. I was just wondering if we want 
to extend it for client users -- for KS we won't need/use the 
timeout-less overloads.




130) My intent was to throw a TimeoutException if we cannot get all 
instanceIds, because it's the standard contract for timeouts. It would 
also be hard to tell for a user, if a full or partial result was 
returned (or we add a method `boolean isPartialResult()` to make it 
easier for users).


If there is concerns/objections, I am also ok to return a partial result 
-- it would require a change to the newly added `ClientInstanceIds` 
return type -- for `adminInstanceId` we only return a `String` right now 
-- we might need to change this to `Optional` so we are able to 
return a partial result?



131) Of course we could, but I am not sure what we would gain? In the 
end, implementation details would always leak because if we change the 
number of consumer we use, we would return different keys in the `Map`. 
Atm, the proposal implies that the same key might be used for the "main" 
and "restore" consumer of the same thread -- but we can make keys unique 
by adding a `-restore` suffix to the restore-consumer key if we merge 
both maps. -- Curious to hear what others think. I am very open to do it 
differently than currently proposed.



-Matthias


On 10/12/23 8:39 AM, Jun Rao wrote:

Hi, Matthias,

Thanks for the reply.

130. What would be the semantic? If the timeout has expired and only some
of the client instances' id have been retrieved, does the call return the
partial result or throw an exception?

131. Could we group all consumer instances in a single method since we are
returning the key for each instance already? This probably also avoids
exposing implementation details that could change over time.

Thanks,

Jun

On Thu, Oct 12, 2023 at 12:00 AM Sophie Blee-Goldman 
wrote:


Regarding the naming, I personally think `clientInstanceId` makes sense for
the plain clients
  -- especially if we might later introduce the notion of an
`applicationInstanceId`.

I'm not a huge fan of `clientsInstanceIds` for the Kafka Streams API,
though, can we use
`clientInstanceIds` instead? (The difference being the placement of the
plural 's')
I would similarly rename the class to just ClientInstanceIds

we can also not have a timeout-less overload,  because `KafkaStreams` does

not have a `default.api.timeout.ms` config either


With respect to the timeout for the Kafka Streams API, I'm a bit confused
by the
doubletriple-negative of Matthias' comment here, but I was thinking about
this
earlier and this was my take: with the current proposal, we would allow
users to pass
in an absolute timeout as a parameter that would apply to the method as a
whole.
Meanwhile within the method we would issue separate calls to each of the
clients using
the default or user-configured value of their  `default.api.timeout.ms` as
the timeout
parameter.

So the API as proposed makes sense to me.


On Wed, Oct 11, 2023 at 6:48 PM Matthias J. Sax  wrote:


In can answer 130 and 131.

130) We cannot guarantee that all clients are already initialized due to
race conditions. We plan to not allow calling
`KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or
REBALANCING) though -- guess this slipped on the KIP and should be
added? But because StreamThreads can be added dynamically (and producer
might be created dynamically at runtime; cf below), we still cannot
guarantee that all clients are already initialized when the method is
called. Of course, we assume that all clients are most likely initialize
on the happy path, and blocking calls to `client.clientInstanceId()`
should be rare.

To address the worst case, we won't do a naive implementation and just
loop over all clients, but fan-out the call to the different
StreamThreads (and GlobalStreamThread if it exists), and use Futures to
gather the results.

Currently, `StreamThreads` has 3 clients (if ALOS or EOSv2 is used), so
we might do 3 blocking calls in the worst case (for EOSv1 we get a
producer per tasks, and we might end up doing more blocking calls if the
producers are

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-11 Thread Matthias J. Sax

Quick addendum.

Some DSL operator use `range` and seems to rely on ascending order, 
too... Of course, if we say `range()` has no ordering guarantee, we 
would add `forwardRange()` and let the DSL use `forwardRange`, too.


The discussion of course also applies to `all()` and `reveserAll()`, and 
and I assume also `prefixScan()` (even if there is no "reverse" version 
for it).



On 10/11/23 10:22 PM, Matthias J. Sax wrote:

Thanks for raising this question Hanyu. Great find!

My interpretation is as follows (it's actually a warning signal that the 
API contract is not better defined, and we should fix this by extending 
JavaDocs and docs on the web page about it).


We have existing `range()` and `reverseRange()` methods on 
`ReadOnlyKeyValueStore` -- the interface itself is not typed (ie, just 
generics), and we state that we don't guarantee "logical order" because 
underlying stores are based on `byte[]` type. So far so... well.


However, to make matters worse, we are also not explicit if the 
underlying store implementation *must* return keys is 
byte[]-lexicographical order or not...


For `range()`, I would be kinda willing to accept that there is no 
ordering guarantee at all -- for example, if the underlying byte[]-store 
is hash-based and implements a full scan to answer a `range()` it might 
not be efficient, but also not incorrect if keys are be returned in some 
"random" (byte[]-)order. In isolation, I don't see an API contract 
violation.


However, `reverseRange` implicitly states with its name, that some 
"descending order" (base on keys) is expected. Given the JavaDoc comment 
about "logical" vs "byte[]" order, the contract (at least to me) is 
clear: returns records in descending byte[]-lexicographical order. -- 
Any other interpretation seems to be off? Curious to hear if you agree 
or disagree to this interpretation?




If this is correct, it means we are actually lacking a API contract for 
ascending byte[]-lexicographical range scan. Furthermore, a hash-based 
byte[]-store would need to actually explicitly sort it's result for 
`reverseRange` to not violate the contract.


To me, this raises the question if `range()` actually has a 
(non-explicit) contract about returning data in byte[]-lexicographical 
order? It seems a lot of people rely on this, and our default stores 
actually implement it this way. So if we don't look at `range()` in 
isolation, but look at the `ReadOnlyKeyValueStore` interface 
holistically, I would also buy the argument that `range()` implies 
"ascending "byte[]-lexicographical order". Thoughts?


To be frank: to me, it's pretty clear that the original idea to add 
`range()` was to return data in ascending order.



Question 1:
  - Do we believe that the range() contract is ascending 
byte[]-lexicographical order right now?


    If yes, I would propose to make it explicit in the JavaDocs.

    If no, I would also propose to make it explicit in the JavaDocs. In 
addition, it raises the question if a method `forwardRange()` (for the 
lack of a better idea about a name right now) is actually missing to 
provide such a contract?



Of course, we always depend on the serialization format for order, and 
if users need "logical order" they need to ensure to use a serialization 
format that align byte[]-lexicographical order to logical order. But for 
the scope of this work, I would not even try to open this can of worms...





Looking into `RangeQuery` the JavaDocs don't say anything about order. 
Thus, `RangeQuery#range()` could actually also be implemented by calling 
`reverseRange()` without violating the contract as it seems. A hash-base 
store could also implement it, without the need to explicitly sort...


What brings be back to my original though about having three types of 
results for `Range`

  - no ordering guarantee
  - ascending (we would only give byte[]-lexicographical order)
  - descending (we would only give byte[]-lexicographical order)

Again, I actually believe that the original intent of RangeQuery was to 
inherit the ascending order of `ReadOnlyKeyValueStore#range()`... Please 
keep me honest about it.  On the other hand, both APIs seems to be 
independent enough to not couple them... -- this could actually be a 
step into the right direction and would follow the underlying idea of 
IQv2 to begin with: decouple semantics for the store interfaces from the 
query types and semantics...



OR: we actually say that `RangeQuery#range` did implicitly inherit the 
(non explicit) "ascending byte[]-lexicographical" order of the 
underlying `ReadOnlyKeyValueStore`, and we just need to update the 
(Java)Docs to make it explicit. -- But it might go against the idea of 
IQv2 as stated above.



Furthermore, the consequence would be, that a potential custom 
hash-based store, would need to do extra work to `range()` to do the 
sorting (or of course might

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-11 Thread Matthias J. Sax
s.

Sincerely,
Hanyu

On Thu, Oct 5, 2023 at 9:52 AM Hanyu (Peter) Zheng <

pzh...@confluent.io>

wrote:


Hi, Hao,

In this case, it will return an empty set or list in the end.

Sincerely,
Hanyu

On Wed, Oct 4, 2023 at 10:29 PM Matthias J. Sax 

wrote:



Great discussion!

It seems the only open question might be about ordering guarantees?
IIRC, we had a discussion about this in the past.


Technically (at least from my POV), existing `RangeQuery` does not

have

a guarantee that data is return in any specific order (not even on a

per

partitions bases). It just happens that RocksDB (and as pointed out

by

Hanyu already, also the built-in in-memory store that is base on a
tree-map) allows us to return data ordered by key; as mentioned

already,

this guarantee is limited on a per partition basis.

If there would be custom store base on a hashed key-value store, this
store could implement RangeQuery and return data (even for a single
partition) with no ordering, without violating the contract.



Thus, it could actually make sense, to extend `RangeQuery` and allow
three options: no-order, ascending, descending. For our existing
Rocks/InMemory implementations, no-order could be equal to ascending

and

nothing changes effectively, but it might be a better API contract?

--

If we assume that there might be a custom hash-based store, such a

store

could reject a query if "ascending" is required, or might need to do
more work to implement it (up to the store maintainer). This is

actually

the beauty of IQv2 that different stores can pick what queries they

want

to support.

  From an API contract point of view, it seems confusing to say:
specifying nothing means no guarantee (or ascending if the store can
offer it), but descending can we explicitly request. Thus, a

hash-based

store, might be able to accept "order not specified query", but would
reject "descending". This seems to be somewhat unbalanced?

Thus, I am wondering if we should actually add `withAscendingKeys()`,
too, even if it won't impact our current RocksDB/In-Memory
implementations?


The second question is about per-partition or across-partition

ordering:

it's not possible right now to actually offer across-partition

ordering

the way IQv2 is setup. The reason is, that the store that implements

a

query type, is always a single shard. Thus, the implementation does

not

have access to other shards. It's hard-coded inside Kafka Streams, to
query each shared, and to "accumulate" partial results, and return

the

back to the user. Note that the API is:



StateQueryResult result = KafkaStreams.query(...);
Map> resultPerPartitions =

result.getPartitionResults();


Thus, if we would want to offer across-partition ordering, we cannot

do

it right now, because Kafka Streams does not know anything about the
semantics of the query it distributes... -- the result is an unknown
type . We would need to extend IQv2 with an additional mechanism,
that allows users to plug in more custom code to "merge" multiple
partitions result into a "global result". This is clearly

out-of-scope

for this KIP and would require a new KIP by itself.

I seems that this contract, which is independent of the query type is
not well understood, and thus a big +1 to fix the documentation. I

don't

think that this KIP must "define" anything, but it might of course be
worth to add the explanation why the KIP cannot even offer
global-ordering, as it's defined/limited by the IQv2 "framework"

itself,

not the individual queries.



-Matthias




On 10/4/23 4:38 PM, Hao Li wrote:

Hi Hanyu,

Thanks for the KIP! Seems there are already a lot of good

discussions.

I

only have two comments:

1. Please make it clear in
```
  /**
   * Interactive range query using a lower and upper bound to

filter the

keys returned.
   * @param lower The key that specifies the lower bound of the

range

   * @param upper The key that specifies the upper bound of the

range

   * @param  The key type
   * @param  The value type
   */
  public static  RangeQuery withRange(final K lower,

final K

upper) {
  return new RangeQuery<>(Optional.ofNullable(lower),
Optional.ofNullable(upper), true);
  }
```
that a `null` in lower or upper parameter means it's unbounded.
2. What's the behavior if lower is 3 and upper is 1? Is it

IllegalArgument

or will this return an empty result? Maybe also clarify this in the
document.

Thanks,
Hao


On Wed, Oct 4, 2023 at 9:27 AM Hanyu (Peter) Zheng
 wrote:


For testing purposes, we previously used a Set to record the

results

in

IQv2StoreIntegrationTest. Let's take an example where we now have

two

partitions and four key-value pairs: <0,0> in p0, <1,1> in p1,

<2,2>

in p0,

and <3,3> in p1.

If we execute withRange(1,3), it will return a Set of <1, 2, 3>.

However,

if we run withRange(1,3).wit

Re: [DISCUSS] KIP-969: Support range interactive queries for versioned state stores

2023-10-11 Thread Matthias J. Sax

Thanks for the update.



To retrieve

the latest value(s), the user must call just the asOf method with the MAX
value (asOf(MAX)). The same applies to KIP-968. Do you think it is clumsy,
Matthias?



Well, in KIP-968 calling `asOf` and passing in a timestamp is optional, 
and default is "latest", right? So while `asOf(MAX)` does the same 
thing, practically users would never call `asOf` for a "latest" query?


In this KIP, we enforce that users give us a key range (we have the 4 
static entry point methods to define a query for this), and we say we 
default to "no bounds" for time range by default.


The existing `RangeQuery` allows to query a range of keys for existing 
stores. It seems to be a common pattern to query a key-range on latest. 
-- in the current proposal, users would need to do:


MultiVersionedRangeQuery.withKeyRange(startKey, endKey).asOf(MAX);

Would like to hear from others if we think that's good user experience? 
If we agree to accept this, I think we should explain how to do this in 
the JavaDocs (and also regular docs... --- otherwise, I can already 
anticipate user question on all question-asking-channels how to do a 
"normal key range query". IMHO, the problem is not that the code itself 
it too clumsy, but that it's totally not obvious to uses how to express 
it without actually explaining it to them. It basically violated the API 
design rule "make it easy to use / simple things should be easy".


Btw: We could also re-use `RangeQuery` and add am implementation to 
`VersionedStateStore` to just accept this query type, with "key range 
over latest" semantics. -- The issue is of course, that uses need to 
know that the query would return `ValueAndTimestamp` and not plain `V` 
(or we add a translation step to unwrap the value, but we would lose the 
"validFrom" timestamp -- validTo would be `null`). Because type safety 
is a general issue in IQv2 it would not make it worse (in the strict 
sense), but I am also not sure if we want to dig an even deeper hole...



-Matthias


On 10/10/23 11:55 AM, Alieh Saeedi wrote:

Thanks, Matthias and Bruno, for the feedback on KIP-969. Here is a summary
of the updates I made to the KIP:

1.  I liked the idea of renaming methods as Matthias suggested.
2. I removed the allversions() method as I did in KIP-968. To retrieve
the latest value(s), the user must call just the asOf method with the MAX
value (asOf(MAX)). The same applies to KIP-968. Do you think it is clumsy,
Matthias?
3. I added a method to the *VersionedKeyValueStore *interface, as I did
for KIP-968.
4. Matthias: I do not get what you mean by your second comment. Isn't
the KIP already explicit about that?

> I assume, results are returned by timestamp for each key. The KIP
should be explicit about it.


Cheers,
Alieh



On Tue, Oct 3, 2023 at 6:07 AM Matthias J. Sax  wrote:


Thanks for updating the KIP.

Not sure if I agree or not with Bruno's idea to split the query types
further? In the end, we split them only because there is three different
return types: single value, value-iterator, key-value-iterator.

What do we gain by splitting out single-ts-range-key? In the end, for
range-ts-range-key the proposed class is necessary and is a superset
(one can set both timestamps to the same value, for single-ts lookup).

The mentioned simplification might apply to "single-ts-range-key" but I
don't see a simplification for the proposed (and necessary) query type?

On the other hand, I see an advantage of a single-ts-range-key for
querying over the "latest version" with a range of keys. For a
single-ts-range-key query, this it would be the default (similar to
VersionedKeyQuery with not asOf-timestamped defined).

In the current version of the KIP, (if we agree that default should
actually return "all versions" not "latest" -- this default was
suggested by Bruno on KIP-968 and makes sense to me, so we would need to
have the same default here to stay consistent), users would need to pass
in `from(Long.MAX).to(Long.MAX)` (if I got this right) to query the
latest point in time only, what seems to be clumsy? Or we could add a
`lastestKeyOnly` option to `MultiVersionedRangeQuery`, but it does seems
a little clumsy, too.




The overall order of the returned records is by Key


I assume, results are returned by timestamp for each key. The KIP should
be explicit about it.



To be very explicit, should we rename the methods to specify the key bound?

   - withRange -> withKeyRange
   - withLowerBound -> withLowerKeyBound
   - withUpperBound -> withUpperKeyBound
   - withNoBounds -> allKeys (or withNoKeyBounds, but we use
`allVersions` and not `noTimeBound` and should align the naming?)



-Matthias


On 9/6/23 5:25 AM, Bruno Cadonna wrote:

Hi Alieh,

Thanks for the KIP!

One high level comment/question:

I assume you se

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-11 Thread Matthias J. Sax
ery 
of KIP-960 and then one with a key and a time range. When you iterate 
over the results you could also call validTo(). Maybe add some actual 
records in the comments to show what the result might look like.


Regarding the test plan, I hope you also plan to add unit tests in all 
of your KIPs. Maybe you could also explain why system tests are not 
needed here.


Best,
Bruno

On 10/10/23 5:36 PM, Alieh Saeedi wrote:

Thank you all for the very exact and constructive comments. I really
enjoyed reading your ideas and all the important points you made me aware
of. I updated KIP-968 as follows:

    1. If the key or time bounds are null, the method returns NPE.
    2. The "valid" word: I removed the sentence "all the records that are
    valid..." and replaced it with an exact explanation. More over, I 
explained
    it with an example in the KIP but not in the javadocs. Do I need 
to add the

    example to the javadocs as well?
    3. Since I followed Bruno's suggestion and removed the allVersions()
    method, the problem of meaningless combinations is solved, and I 
do not
    need any IllegalArgumentException or something like that. 
Therefore, the
    change is that if no time bound is specified, the query returns 
the records

    with the specified key for all timestamps (all versions).
    4. As Victoria suggested, adding a method to the 
*VersionedKeyValueStore

    *interface is essential. So I did that. I had this method only in the
    RocksDBVersionedStore class, which was not enough.
    5. I added the *validTo* field to the VersionedRecord class to be 
able
    to represent the tombstones. As you suggested, we postpone solving 
the

    problem of retrieving consecutive tombstones for later.
    6. I added the "Test Plan" section to all KIPs. I hope what I 
wrote is

    convincing.
    7. I added the *withAscendingTimestamp()* method to provide more
code readability
    for the user.
    8. I removed the evil word "get" from all getter methods.

There have also been some more suggestions which I am still not convinced
or clear about them:

    1. Regarding asOf vs until: reading all comments, my conclusion 
was that
    I keep it as "asOf" (following Walker's idea as the native speaker 
as well
    as Bruno's suggestion to be consistent with single-key_single_ts 
queries).
    But I do not have a personal preference. If you insist on "until", 
I change

    it.
    2. Bruno suggested renaming the class "MultiVersionedKeyQuery" to sth
    else. We already had a long discussion about the name with 
Matthias. I am

    open to renaming it to something else, but do you have any ideas?
    3. Matthias suggested having a method with two input parameters that
    enables the user to specify both time bounds in the same method. 
Isn't it
    introducing redundancy? It is somehow disrespectful to the idea of 
having

    composable methods.
    4. Bruno suggested renaming the methods "asOf" and "from" to 
"asOfTime"

    and "fromTime". If I do that, then it is not consistent with KIP-960.
    Moreover, the input parameter is clearly a timestamp, which explains
    enough. What do you think about that?
    5. I was asked to add more examples to the example section. My 
question
    is, what is the main purpose of that? If I know it clearly, then I 
can add

    what you mean.



Cheers,
Alieh

On Tue, Oct 10, 2023 at 1:13 AM Matthias J. Sax  wrote:


Bruno and I had some background conversation about the `get` prefix
question including a few other committers.

The official policy was never changed, and we should not add the
`get`-prefix. It's a slip on our side in previous KIPs to add the
`get`-prefix and we should actually clean it up doing a follow up KIP.


-Matthias


On 10/5/23 5:26 AM, Bruno Cadonna wrote:

Hi Matthias,

Given all the IQv2 KIPs that use getX and given recent PRs (internal
interfaces mainly) that got merged, I was under the impression that we
moved away from the strict no-getX policy.

I do not think it was an accident using getX in the IQv2 KIPs since
somebody would have brought it up, otherwise.

I am fine with both types of getters.

If we think, we need to discuss this in a broader context, let's 
start a

separate thread.


Best,
Bruno





On 10/5/23 7:44 AM, Matthias J. Sax wrote:

I agree to (almost) everything what Bruno said.



In general, we tend to move away from using getters without "get",
recently. So I would keep the "get".


This is new to me? Can you elaborate on this point? Why do you think
that's the case?

I actually did realize (after Walker mentioned it) that existing query
types use `get` prefix, but to me it seems that it was by accident and
we should consider correcting it? Thus, I would actually prefer to not
add the `get` prefix for new methods query types.

IMHO, we should do a follow up KIP to deprecate all methods with `ge

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-11 Thread Matthias J. Sax

In can answer 130 and 131.

130) We cannot guarantee that all clients are already initialized due to 
race conditions. We plan to not allow calling 
`KafkaStreams#clientsInstanceIds()` when the state is not RUNNING (or 
REBALANCING) though -- guess this slipped on the KIP and should be 
added? But because StreamThreads can be added dynamically (and producer 
might be created dynamically at runtime; cf below), we still cannot 
guarantee that all clients are already initialized when the method is 
called. Of course, we assume that all clients are most likely initialize 
on the happy path, and blocking calls to `client.clientInstanceId()` 
should be rare.


To address the worst case, we won't do a naive implementation and just 
loop over all clients, but fan-out the call to the different 
StreamThreads (and GlobalStreamThread if it exists), and use Futures to 
gather the results.


Currently, `StreamThreads` has 3 clients (if ALOS or EOSv2 is used), so 
we might do 3 blocking calls in the worst case (for EOSv1 we get a 
producer per tasks, and we might end up doing more blocking calls if the 
producers are not initialized yet). Note that EOSv1 is already 
deprecated, and we are also working on thread refactoring that will 
reduce the number of client on StreamThread to 2 -- and we have more 
refactoring planned to reduce the number of clients even further.


Inside `KafakStreams#clientsInstanceIds()` we might only do single 
blocking call for the admin client (ie, `admin.clientInstanceId()`).


I agree that we need to do some clever timeout management, but it seems 
to be more of an implementation detail?


Do you have any particular concerns, or does the proposed implementation 
as sketched above address your question?



130) If the Topology does not have a global-state-store, there won't be 
a GlobalThread and thus not global consumer. Thus, we return an Optional.




On three related question for Andrew.

(1) Why is the method called `clientInstanceId()` and not just plain 
`instanceId()`?


(2) Why so we return a `String` while but not a UUID type? The added 
protocol request/response classes use UUIDs.


(3) Would it make sense to have an overloaded `clientInstanceId()` 
method that does not take any parameter but uses `default.api.timeout` 
config (this config does no exist on the producer though, so we could 
only have it for consumer and admin at this point). We could of course 
also add overloads like this later if user request them (and/or add 
`default.api.timeout.ms` to the producer, too).


Btw: For KafkaStreams, I think `clientsInstanceIds` still makes sense as 
a method name though, as `KafkaStreams` itself does not have an 
`instanceId` -- we can also not have a timeout-less overload, because 
`KafkaStreams` does not have a `default.api.timeout.ms` config either 
(and I don't think it make sense to add).




-Matthias

On 10/11/23 5:07 PM, Jun Rao wrote:

Hi, Andrew,

Thanks for the updated KIP. Just a few more minor comments.

130. KafkaStreams.clientsInstanceId(Duration timeout): Does it wait for all
consumer/producer/adminClient instances to be initialized? Are all those
instances created during KafkaStreams initialization?

131. Why does globalConsumerInstanceId() return Optional while
other consumer instances don't return Optional?

132. ClientMetricsSubscriptionRequestCount: Do we need this since we have a
set of generic metrics
(kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*) that
report Request rate for every request type?

Thanks,

Jun

On Wed, Oct 11, 2023 at 1:47 PM Matthias J. Sax  wrote:


Thanks!

On 10/10/23 11:31 PM, Andrew Schofield wrote:

Matthias,
Yes, I think that’s a sensible way forward and the interface you propose

looks good. I’ll update the KIP accordingly.


Thanks,
Andrew


On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to be

important, as we don't know if/when a follow-up KIP for Kafka Streams would
land.


I was also thinking (and discussed with a few others) how to expose it,

and we would propose the following:


We add a new method to `KafkaStreams` class:

 public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

   public class ClientsInstanceIds {
 // we only have a single admin client per KS instance
 String adminInstanceId();

 // we only have a single global consumer per KS instance (if any)
 // Optional<> because we might not have global-thread
 Optional globalConsumerInstanceId();

 // return a  ClientInstanceId> mapping
 // for the underlying (restore-)consumers/producers
 Map mainConsumerInstanceIds();
 Map restoreConsumerInstanceIds();
 Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

   [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matth

Re: [VOTE] KIP-714: Client metrics and observability

2023-10-11 Thread Matthias J. Sax

+1 (binding)

On 9/13/23 5:48 PM, Jason Gustafson wrote:

Hey Andrew,

+1 on the KIP. For many users of Kafka, it may not be fully understood how
much of a challenge client monitoring is. With tens of clients in a
cluster, it is already difficult to coordinate metrics collection. When
there are thousands of clients, and when the cluster operator has no
control over them, it is essentially impossible. For the fat clients that
we have, the lack of useful telemetry is a huge operational gap.
Consistency between clients has also been a major challenge. I think the
effort toward standardization in this KIP will have some positive impact
even in deployments which have effective client-side monitoring. Overall, I
think this proposal will provide a lot of value across the board.

Best,
Jason

On Wed, Sep 13, 2023 at 9:50 AM Philip Nee  wrote:


Hey Andrew -

Thank you for taking the time to reply to my questions. I'm just adding
some notes to this discussion.

1. epoch: It can be helpful to know the delta of the client side and the
actual leader epoch.  It is helpful to understand why sometimes commit
fails/client not making progress.
2. Client connection: If the client selects the "wrong" connection to push
out the data, I assume the request would timeout; which should lead to
disconnecting from the node and reselecting another node as you mentioned,
via the least loaded node.

Cheers,
P


On Tue, Sep 12, 2023 at 10:40 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Philip,
Thanks for your vote and interest in the KIP.

KIP-714 does not introduce any new client metrics, and that’s

intentional.

It does
tell how that all of the client metrics can have their names transformed
into
equivalent "telemetry metric names”, and then potentially used in metrics
subscriptions.

I am interested in the idea of client’s leader epoch in this context, but
I don’t have
an immediate plan for how best to do this, and it would take another KIP
to enhance
existing metrics or introduce some new ones. Those would then naturally

be

applicable to the metrics push introduced in KIP-714.

In a similar vein, there are no existing client metrics specifically for
auto-commit.
We could add them to Kafka, but I really think this is just an example of
asynchronous
commit in which the application has decided not to specify when the

commit

should
begin.

It is possible to increase the cadence of pushing by modifying the
interval.ms
configuration property of the CLIENT_METRICS resource.

There is an “assigned-partitions” metric for each consumer, but not one

for

active partitions. We could add one, again as a follow-on KIP.

I take your point about holding on to a connection in a channel which

might

experience congestion. Do you have a suggestion for how to improve on

this?

For example, the client does have the concept of a least-loaded node.

Maybe

this is something we should investigate in the implementation and decide
on the
best approach. In general, I think sticking with the same node for
consecutive
pushes is best, but if you choose the “wrong” node to start with, it’s

not

ideal.

Thanks,
Andrew


On 8 Sep 2023, at 19:29, Philip Nee  wrote:

Hey Andrew -

+1 but I don't have a binding vote!

It took me a while to go through the KIP. Here are some of my notes

during

the reading:

*Metrics*
- Should we care about the client's leader epoch? There is a case where

the

user recreates the topic, but the consumer thinks it is still the same
topic and therefore, attempts to start from an offset that doesn't

exist.

KIP-848 addresses this issue, but I can still see some potential

benefits

from knowing the client's epoch information.
- I assume poll idle is similar to poll interval: I needed to read the
description a few times.
- I don't have a clear use case in mind for the commit latency, but I

do

think sometimes people lack clarity about how much progress was tracked

by

the auto-commit.  Would tracking auto-commit-related metrics be

useful? I

was thinking: the last offset committed or the actual cadence in ms.
- Are there cases when we need to increase the cadence of telemetry

data

push? i.e. variable interval.
- Thanks for implementing the randomized initial metric push; I think

it

is

really important.
- Is there a potential use case for tracking the number of active
partitions? The consumer can pause partitions via API, during

revocation,

or during offset reset for the stream.

*Connections*:
- The KIP stated that it will keep the same connection until the

connection

is disconnected. I wonder if that could potentially cause congestion if

it

is already a busy channel, which leads to connection timeout and
subsequently disconnection.

Thanks,
P

On Fri, Sep 8, 2023 at 4:15 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Bumping the voting thread for KIP-714.

So far, we have:
Non-binding +2 (Milind and Kirk), non-binding -1 (Ryanne)

Thanks,
Andrew


On 4 Aug 2023, at 

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-11 Thread Matthias J. Sax

Thanks!

On 10/10/23 11:31 PM, Andrew Schofield wrote:

Matthias,
Yes, I think that’s a sensible way forward and the interface you propose looks 
good. I’ll update the KIP accordingly.

Thanks,
Andrew


On 10 Oct 2023, at 23:01, Matthias J. Sax  wrote:

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to be 
important, as we don't know if/when a follow-up KIP for Kafka Streams would 
land.

I was also thinking (and discussed with a few others) how to expose it, and we 
would propose the following:

We add a new method to `KafkaStreams` class:

public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

  public class ClientsInstanceIds {
// we only have a single admin client per KS instance
String adminInstanceId();

// we only have a single global consumer per KS instance (if any)
// Optional<> because we might not have global-thread
Optional globalConsumerInstanceId();

// return a  ClientInstanceId> mapping
// for the underlying (restore-)consumers/producers
Map mainConsumerInstanceIds();
Map restoreConsumerInstanceIds();
Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

  [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matthias,
Good point. Makes sense to me.
Is this something that can also be included in the proposed Kafka Streams 
follow-on KIP, or would you prefer that I add it to KIP-714?
I have a slight preference for the former to put all of the KS enhancements 
into a separate KIP.
Thanks,
Andrew

On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to `KafkaStreams` 
similar to the proposed `clientInstanceId()` that will be added to 
consumer/producer/admin clients.

Without addressing this, Kafka Streams users won't have a way to get the 
assigned `instanceId` of the internally created clients, and thus it would be 
very difficult for them to know which metrics that the broker receives belong 
to a Kafka Streams app. It seems they would only find the `instanceIds` in the 
log4j output if they enable client logging?

Of course, because there is multiple clients inside Kafka Streams, the return type cannot be an 
single "String", but must be some some complex data structure -- we could either add 
a new class, or return a Map using a client key that maps to the 
`instanceId`.

For example we could use the following key:

   [Global]StreamThread[-][-restore][consumer|producer]

(Of course, only the valid combination.)

Or maybe even better, we might want to return a `Future` because collection all 
the `instanceId` might be a blocking all on each client? I have already a few 
idea how it could be implemented but I don't think it must be discussed on the 
KIP, as it's an implementation detail.

Thoughts?


-Matthias

On 10/6/23 4:21 AM, Andrew Schofield wrote:

Hi Matthias,
Thanks for your comments. I agree that a follow-up KIP for Kafka Streams makes 
sense. This KIP currently has made a bit
of an effort to embrace KS, but it’s not enough by a long way.
I have removed `application.id <http://application.id/>`. This should be done 
properly in the follow-up KIP. I don’t believe there’s a downside to
removing it from this KIP.
I have reworded the statement about temporarily. In practice, the 
implementation of this KIP that’s going on while the voting
progresses happens to use delta temporality, but that’s an implementation 
detail. Supporting clients must support both
temporalities.
I thought about exposing the client instance ID as a metric, but non-numeric 
metrics are not usual practice and tools
do not universally support them. I don’t think the KIP is improved by adding 
one now.
I have also added constants for the various Config classes for 
ENABLE_METRICS_PUSH_CONFIG, including to
StreamsConfig. It’s best to be explicit about this.
Thanks,
Andrew

On 2 Oct 2023, at 23:47, Matthias J. Sax  wrote:

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold for a 
while.

Overall it sounds very useful, and I think we should extend this with a follow 
up KIP for Kafka Streams. What is unclear to me at this point is the statement:


Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.


The `application.id` is currently only used as the (main) consumer's `group.id` 
(and is part of an auto-generated `client.id` if the user does not set one).

This comment related to:


The following labels should be added by the client as appropriate before 
metrics are pushed.


Given that Kafka Streams uses the consumer/producer/admin client as "black 
boxes", a client does at this point not know that it's part of a Kafka Streams 
application, and thus, it won't be able to a

[jira] [Commented] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended

2023-10-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773849#comment-17773849
 ] 

Matthias J. Sax commented on KAFKA-15571:
-

Ups... Thanks to reporting and the PR!

> StateRestoreListener#onRestoreSuspended is never called because wrapper 
> DelegatingStateRestoreListener doesn't implement onRestoreSuspended
> ---
>
> Key: KAFKA-15571
> URL: https://issues.apache.org/jira/browse/KAFKA-15571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>
> With https://issues.apache.org/jira/browse/KAFKA-10575 
> `StateRestoreListener#onRestoreSuspended` was added. But local tests show 
> that it is never called because `DelegatingStateRestoreListener` was not 
> updated to call a new method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-10 Thread Matthias J. Sax

Andrew,

yes I would like to get this change into KIP-714 right way. Seems to be 
important, as we don't know if/when a follow-up KIP for Kafka Streams 
would land.


I was also thinking (and discussed with a few others) how to expose it, 
and we would propose the following:


We add a new method to `KafkaStreams` class:

public ClientsInstanceIds clientsInstanceIds(Duration timeout);

The returned object is like below:

  public class ClientsInstanceIds {
// we only have a single admin client per KS instance
String adminInstanceId();

// we only have a single global consumer per KS instance (if any)
// Optional<> because we might not have global-thread
Optional globalConsumerInstanceId();

// return a  ClientInstanceId> mapping
// for the underlying (restore-)consumers/producers
Map mainConsumerInstanceIds();
Map restoreConsumerInstanceIds();
Map producerInstanceIds();
}

For the `threadKey`, we would use some pattern like this:

  [Stream|StateUpdater]Thread-


Would this work from your POV?



-Matthias


On 10/9/23 2:15 AM, Andrew Schofield wrote:

Hi Matthias,
Good point. Makes sense to me.

Is this something that can also be included in the proposed Kafka Streams 
follow-on KIP, or would you prefer that I add it to KIP-714?
I have a slight preference for the former to put all of the KS enhancements 
into a separate KIP.

Thanks,
Andrew


On 7 Oct 2023, at 02:12, Matthias J. Sax  wrote:

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to `KafkaStreams` 
similar to the proposed `clientInstanceId()` that will be added to 
consumer/producer/admin clients.

Without addressing this, Kafka Streams users won't have a way to get the 
assigned `instanceId` of the internally created clients, and thus it would be 
very difficult for them to know which metrics that the broker receives belong 
to a Kafka Streams app. It seems they would only find the `instanceIds` in the 
log4j output if they enable client logging?

Of course, because there is multiple clients inside Kafka Streams, the return type cannot be an 
single "String", but must be some some complex data structure -- we could either add 
a new class, or return a Map using a client key that maps to the 
`instanceId`.

For example we could use the following key:

   [Global]StreamThread[-][-restore][consumer|producer]

(Of course, only the valid combination.)

Or maybe even better, we might want to return a `Future` because collection all 
the `instanceId` might be a blocking all on each client? I have already a few 
idea how it could be implemented but I don't think it must be discussed on the 
KIP, as it's an implementation detail.

Thoughts?


-Matthias

On 10/6/23 4:21 AM, Andrew Schofield wrote:

Hi Matthias,
Thanks for your comments. I agree that a follow-up KIP for Kafka Streams makes 
sense. This KIP currently has made a bit
of an effort to embrace KS, but it’s not enough by a long way.
I have removed `application.id <http://application.id/>`. This should be done 
properly in the follow-up KIP. I don’t believe there’s a downside to
removing it from this KIP.
I have reworded the statement about temporarily. In practice, the 
implementation of this KIP that’s going on while the voting
progresses happens to use delta temporality, but that’s an implementation 
detail. Supporting clients must support both
temporalities.
I thought about exposing the client instance ID as a metric, but non-numeric 
metrics are not usual practice and tools
do not universally support them. I don’t think the KIP is improved by adding 
one now.
I have also added constants for the various Config classes for 
ENABLE_METRICS_PUSH_CONFIG, including to
StreamsConfig. It’s best to be explicit about this.
Thanks,
Andrew

On 2 Oct 2023, at 23:47, Matthias J. Sax  wrote:

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold for a 
while.

Overall it sounds very useful, and I think we should extend this with a follow 
up KIP for Kafka Streams. What is unclear to me at this point is the statement:


Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.


The `application.id` is currently only used as the (main) consumer's `group.id` 
(and is part of an auto-generated `client.id` if the user does not set one).

This comment related to:


The following labels should be added by the client as appropriate before 
metrics are pushed.


Given that Kafka Streams uses the consumer/producer/admin client as "black 
boxes", a client does at this point not know that it's part of a Kafka Streams 
application, and thus, it won't be able to attach any such label to the metrics it sends. 
(Also producer and admin don't even know the value of `application.id` -- only the (main) 
consumer, indirectly via `group.id`, but also restore and global consumer don't know it, 
because they don'

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-09 Thread Matthias J. Sax
Bruno and I had some background conversation about the `get` prefix 
question including a few other committers.


The official policy was never changed, and we should not add the 
`get`-prefix. It's a slip on our side in previous KIPs to add the 
`get`-prefix and we should actually clean it up doing a follow up KIP.



-Matthias


On 10/5/23 5:26 AM, Bruno Cadonna wrote:

Hi Matthias,

Given all the IQv2 KIPs that use getX and given recent PRs (internal 
interfaces mainly) that got merged, I was under the impression that we 
moved away from the strict no-getX policy.


I do not think it was an accident using getX in the IQv2 KIPs since 
somebody would have brought it up, otherwise.


I am fine with both types of getters.

If we think, we need to discuss this in a broader context, let's start a 
separate thread.



Best,
Bruno





On 10/5/23 7:44 AM, Matthias J. Sax wrote:

I agree to (almost) everything what Bruno said.


In general, we tend to move away from using getters without "get", 
recently. So I would keep the "get".


This is new to me? Can you elaborate on this point? Why do you think 
that's the case?


I actually did realize (after Walker mentioned it) that existing query 
types use `get` prefix, but to me it seems that it was by accident and 
we should consider correcting it? Thus, I would actually prefer to not 
add the `get` prefix for new methods query types.


IMHO, we should do a follow up KIP to deprecate all methods with `get` 
prefix and replace them with new ones without `get` -- it's of course 
always kinda "unnecessary" noise, but if we don't do it, we might get 
into more and more inconsistent naming what would result in a "bad" API.


If we indeed want to change the convention and use the `get` prefix, I 
would strongly advocate to bit the bullet and do KIP to pro-actively 
add the `get` "everywhere" it's missing... But overall, it seems to be 
a much broader decision, and we should get buy in from many committers 
about it -- as long as there is no broad consensus to add `get` 
everywhere, I would strongly prefer not to diverge from the current 
agreement to omit `get`.




-Matthias




On 10/4/23 2:36 AM, Bruno Cadonna wrote:

Hi,

Regarding tombstones:
As far as I understand, we need to add either a validTo field to 
VersionedRecord or we need to return tombstones, otherwise the result 
is not complete, because users could never know a record was deleted 
at some point before the second non-null value was put.
I like more adding the validTo field since it makes the result more 
concise and easier interpretable.


Extending on Victoria's example, with the following puts

put(k, v1, time=0)
put(k, null, time=5)
put(k, null, time=10)
put(k, null, time=15)
put(k, v2, time=20)

the result with tombstones would be

value, timestamp
(v1, 0)
(null, 5)
(null, 10)
(null, 15)
(v2, 20)

instead of

value, timestamp, validTo
(v1, 0, 5)
(v2, 20, null)

The benefit of conciseness would already apply to one single tombstone.

On the other hand, why would somebody write consecutive tombstones 
into a versioned state store? I guess if somebody does that on 
purpose, then there should be a way to retrieve each of those 
tombstones, right?
So maybe we need both -- validTo field and the option to return 
tombstones. The latter might be moved to a future KIP in case we see 
the need.



Regarding .within(fromTs, toTs):
I would keep it simple with .from() and .asOfTimestamp() (or 
.until()). If we go with .within(), I would opt for 
.withinTimeRange(fromTs, toTs), because the query becomes more readable:


MultiVersionedKeyQuery
   .withKey(1)
   .withinTimeRange(Instant.parse(2023-08-03T10:37:30.00Z), 
Instant.parse(2023-08-04T10:37:30.00Z))


If we stay with .from() and .until(), we should consider .fromTime() 
and .untilTime() (or .toTime()):


MultiVersionedKeyQuery
  .withKey(1)
  .fromTime(Instant.parse(2023-08-03T10:37:30.00Z))
  .untilTime(Instant.parse(2023-08-04T10:37:30.00Z))



Regarding asOf vs. until:
I think asOf() is more used in point in time queries as Walker 
mentioned where this KIP specifies a time range. IMO asOf() fits very 
well with KIP-960 where one version is queried, but here I think 
.until() fits better. That might just be a matter of taste and in the 
end I am fine with both as long as it is well documented.



Regarding getters without "get":
In the other IQv2 classes we used getters with "get". In general, we 
tend to move away from using getters without "get", recently. So I 
would keep the "get".



Best,
Bruno

On 10/3/23 7:49 PM, Walker Carlson wrote:

Hey Alieh thanks for the KIP,

Weighing in on the AsOf vs Until debate I think either is fine from a
natural language perspective. Personally AsOf makes more sense to me 
where
until gives me the idea that the query is making a change. It's 
totally a

connotative difference and not that important. I think as of is prett

Re: [VOTE] KIP-960: Support single-key_single-timestamp interactive queries (IQv2) for versioned state stores

2023-10-09 Thread Matthias J. Sax
One more nit: as discussed on the related KIP-698 thread, we should not 
use `get` as prefix for the getters.


So it should be `K key()` and `Optional asOfTimestamp()`.


Otherwise the KIP LGTM.


+1 (binding)


-Matthias

On 10/6/23 2:50 AM, Alieh Saeedi wrote:

Hi everyone,

Since KIP-960 is reduced to the simplest IQ type and all further comments
are related to the following-up KIPs, I decided to finalize it at this
point.


A huge thank you to everyone who has reviewed this KIP (and also the
following-up ones), and
participated in the discussion thread!

I'd also like to thank you in advance for taking the time to vote.

Best,
Alieh



[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15569:

Issue Type: Test  (was: Improvement)

> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Update test and add test cases in IQv2StoreIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15569:

Component/s: streams
 unit tests

> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Update test and add test cases in IQv2StoreIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772748#comment-17772748
 ] 

Matthias J. Sax commented on KAFKA-15378:
-

This is a partial fix with regard to versions... 
[https://github.com/apache/kafka/pull/14490] (cf. other PR for older branches).

There is also a bug in state updated that breaks some system tests – [~cadonna] 
opened already a PR for it: [https://github.com/apache/kafka/pull/14508]

I did identify another issue already and will open a PR after Bruno's PR was 
merged so I can get his fix upfront.

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>    Assignee: Matthias J. Sax
>Priority: Major
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15378:
---

Assignee: Matthias J. Sax

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>    Assignee: Matthias J. Sax
>Priority: Major
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-06 Thread Matthias J. Sax

Thanks Andrew. SGTM.

One point you did not address is the idea to add a method to 
`KafkaStreams` similar to the proposed `clientInstanceId()` that will be 
added to consumer/producer/admin clients.


Without addressing this, Kafka Streams users won't have a way to get the 
assigned `instanceId` of the internally created clients, and thus it 
would be very difficult for them to know which metrics that the broker 
receives belong to a Kafka Streams app. It seems they would only find 
the `instanceIds` in the log4j output if they enable client logging?


Of course, because there is multiple clients inside Kafka Streams, the 
return type cannot be an single "String", but must be some some complex 
data structure -- we could either add a new class, or return a 
Map using a client key that maps to the `instanceId`.


For example we could use the following key:

   [Global]StreamThread[-][-restore][consumer|producer]

(Of course, only the valid combination.)

Or maybe even better, we might want to return a `Future` because 
collection all the `instanceId` might be a blocking all on each client? 
I have already a few idea how it could be implemented but I don't think 
it must be discussed on the KIP, as it's an implementation detail.


Thoughts?


-Matthias

On 10/6/23 4:21 AM, Andrew Schofield wrote:

Hi Matthias,
Thanks for your comments. I agree that a follow-up KIP for Kafka Streams makes 
sense. This KIP currently has made a bit
of an effort to embrace KS, but it’s not enough by a long way.

I have removed `application.id <http://application.id/>`. This should be done 
properly in the follow-up KIP. I don’t believe there’s a downside to
removing it from this KIP.

I have reworded the statement about temporarily. In practice, the 
implementation of this KIP that’s going on while the voting
progresses happens to use delta temporality, but that’s an implementation 
detail. Supporting clients must support both
temporalities.

I thought about exposing the client instance ID as a metric, but non-numeric 
metrics are not usual practice and tools
do not universally support them. I don’t think the KIP is improved by adding 
one now.

I have also added constants for the various Config classes for 
ENABLE_METRICS_PUSH_CONFIG, including to
StreamsConfig. It’s best to be explicit about this.

Thanks,
Andrew


On 2 Oct 2023, at 23:47, Matthias J. Sax  wrote:

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold for a 
while.

Overall it sounds very useful, and I think we should extend this with a follow 
up KIP for Kafka Streams. What is unclear to me at this point is the statement:


Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.


The `application.id` is currently only used as the (main) consumer's `group.id` 
(and is part of an auto-generated `client.id` if the user does not set one).

This comment related to:


The following labels should be added by the client as appropriate before 
metrics are pushed.


Given that Kafka Streams uses the consumer/producer/admin client as "black 
boxes", a client does at this point not know that it's part of a Kafka Streams 
application, and thus, it won't be able to attach any such label to the metrics it sends. 
(Also producer and admin don't even know the value of `application.id` -- only the (main) 
consumer, indirectly via `group.id`, but also restore and global consumer don't know it, 
because they don't have `group.id` set).

While I am totally in favor of the proposal, I am wondering how we intent to implement it 
in clean way? Or would we do ok to have some internal client APIs that KS can use to 
"register" itself with the client?




While clients must support both temporalities, the broker will initially only 
send GetTelemetrySubscriptionsResponse.DeltaTemporality=True


Not sure if I can follow. How make the decision about DELTA or CUMULATIVE metrics? Should 
the broker side plugin not decide what metrics it what to receive in which form? So what 
does "initially" mean -- the broker won't ship with a default plugin 
implementation?




The following method is added to the Producer, Consumer, and Admin client 
interfaces:


Should we add anything to Kafka Streams to expose the underlying clients' 
assigned client-instance-ids programmatically? I am also wondering if clients 
should report their assigned client-instance-ids as metrics itself (for this 
case, Kafka Streams won't need to do anything, because we already expose all 
client metrics).

If we add anything programmatic, we need to make it simple, given that Kafka 
Streams has many clients per `StreamThread` and may have multiple threads.




enable.metrics.push

It might be worth to add this to `StreamsConfig`, too? It set via 
StreamsConfig, we would forward it to all clients automatically.




-Matthias


On 9/29/23 5:45 PM, David Jacot wrote:

Hi Andre

[jira] [Resolved] (KAFKA-15437) Add metrics about open iterators

2023-10-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15437.
-
Resolution: Duplicate

> Add metrics about open iterators
> 
>
> Key: KAFKA-15437
> URL: https://issues.apache.org/jira/browse/KAFKA-15437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: need-kip
>
> Kafka Streams allows to create iterators over state stores. Those iterator 
> must get closed to free up resources (especially for RocksDB). – We regularly 
> get user reports of "resource leaks" that can be pinned down to leaking (ie 
> not-closed) iterators.
> To simplify monitoring, it would be helpful to add a metric about open 
> iterators to allow users to alert and pin-point the issue directly (and 
> before the actually resource leak is observed).
> We might want to have a DEBUG level per-store metric (to allow identifying 
> the store in question quickly), but an already rolled up INFO level metric 
> for the whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15437) Add metrics about open iterators

2023-10-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15437.
-
Resolution: Duplicate

> Add metrics about open iterators
> 
>
> Key: KAFKA-15437
> URL: https://issues.apache.org/jira/browse/KAFKA-15437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: need-kip
>
> Kafka Streams allows to create iterators over state stores. Those iterator 
> must get closed to free up resources (especially for RocksDB). – We regularly 
> get user reports of "resource leaks" that can be pinned down to leaking (ie 
> not-closed) iterators.
> To simplify monitoring, it would be helpful to add a metric about open 
> iterators to allow users to alert and pin-point the issue directly (and 
> before the actually resource leak is observed).
> We might want to have a DEBUG level per-store metric (to allow identifying 
> the store in question quickly), but an already rolled up INFO level metric 
> for the whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-04 Thread Matthias J. Sax

I agree to (almost) everything what Bruno said.



In general, we tend to move away from using getters without "get", recently. So I would 
keep the "get".


This is new to me? Can you elaborate on this point? Why do you think 
that's the case?


I actually did realize (after Walker mentioned it) that existing query 
types use `get` prefix, but to me it seems that it was by accident and 
we should consider correcting it? Thus, I would actually prefer to not 
add the `get` prefix for new methods query types.


IMHO, we should do a follow up KIP to deprecate all methods with `get` 
prefix and replace them with new ones without `get` -- it's of course 
always kinda "unnecessary" noise, but if we don't do it, we might get 
into more and more inconsistent naming what would result in a "bad" API.


If we indeed want to change the convention and use the `get` prefix, I 
would strongly advocate to bit the bullet and do KIP to pro-actively add 
the `get` "everywhere" it's missing... But overall, it seems to be a 
much broader decision, and we should get buy in from many committers 
about it -- as long as there is no broad consensus to add `get` 
everywhere, I would strongly prefer not to diverge from the current 
agreement to omit `get`.




-Matthias




On 10/4/23 2:36 AM, Bruno Cadonna wrote:

Hi,

Regarding tombstones:
As far as I understand, we need to add either a validTo field to 
VersionedRecord or we need to return tombstones, otherwise the result is 
not complete, because users could never know a record was deleted at 
some point before the second non-null value was put.
I like more adding the validTo field since it makes the result more 
concise and easier interpretable.


Extending on Victoria's example, with the following puts

put(k, v1, time=0)
put(k, null, time=5)
put(k, null, time=10)
put(k, null, time=15)
put(k, v2, time=20)

the result with tombstones would be

value, timestamp
(v1, 0)
(null, 5)
(null, 10)
(null, 15)
(v2, 20)

instead of

value, timestamp, validTo
(v1, 0, 5)
(v2, 20, null)

The benefit of conciseness would already apply to one single tombstone.

On the other hand, why would somebody write consecutive tombstones into 
a versioned state store? I guess if somebody does that on purpose, then 
there should be a way to retrieve each of those tombstones, right?
So maybe we need both -- validTo field and the option to return 
tombstones. The latter might be moved to a future KIP in case we see the 
need.



Regarding .within(fromTs, toTs):
I would keep it simple with .from() and .asOfTimestamp() (or .until()). 
If we go with .within(), I would opt for .withinTimeRange(fromTs, toTs), 
because the query becomes more readable:


MultiVersionedKeyQuery
   .withKey(1)
   .withinTimeRange(Instant.parse(2023-08-03T10:37:30.00Z), 
Instant.parse(2023-08-04T10:37:30.00Z))


If we stay with .from() and .until(), we should consider .fromTime() and 
.untilTime() (or .toTime()):


MultiVersionedKeyQuery
  .withKey(1)
  .fromTime(Instant.parse(2023-08-03T10:37:30.00Z))
  .untilTime(Instant.parse(2023-08-04T10:37:30.00Z))



Regarding asOf vs. until:
I think asOf() is more used in point in time queries as Walker mentioned 
where this KIP specifies a time range. IMO asOf() fits very well with 
KIP-960 where one version is queried, but here I think .until() fits 
better. That might just be a matter of taste and in the end I am fine 
with both as long as it is well documented.



Regarding getters without "get":
In the other IQv2 classes we used getters with "get". In general, we 
tend to move away from using getters without "get", recently. So I would 
keep the "get".



Best,
Bruno

On 10/3/23 7:49 PM, Walker Carlson wrote:

Hey Alieh thanks for the KIP,

Weighing in on the AsOf vs Until debate I think either is fine from a
natural language perspective. Personally AsOf makes more sense to me 
where

until gives me the idea that the query is making a change. It's totally a
connotative difference and not that important. I think as of is pretty
frequently used in point of time queries.

Also for these methods it makes sense to drop the "get" We don't
normally use that in getters

    * The key that was specified for this query.
    */
   public K getKey();

   /**
    * The starting time point of the query, if specified
    */
   public Optional getFromTimestamp();

   /**
    * The ending time point of the query, if specified
    */
   public Optional getAsOfTimestamp();

Other than that I didn't have too much to add. Overall I like the 
direction

of the KIP and think the funcatinlyt is all there!
best,
Walker



On Mon, Oct 2, 2023 at 10:46 PM Matthias J. Sax  wrote:


Thanks for the updated KIP. Overall I like it.

Victoria raises a very good point, and I personally tend to prefer (I
believe so does Victoria, but it's not totally clear from her email) if
a range query would not return any tombsto

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-04 Thread Matthias J. Sax
require opening a rocksdb range scan** on multiple rocksdb

instances (one

per partition), and polling the first key of each. Whether or

not this is

ordered, could we please add that to the documentation?

**(How is this implemented/guaranteed in an

`inMemoryKeyValueStore`? I

don't know about that implementation).

Colt McNealy

*Founder, LittleHorse.dev*


On Tue, Oct 3, 2023 at 1:35 PM Hanyu (Peter) Zheng
 wrote:


ok, I will update it. Thank you  Matthias

Sincerely,
Hanyu

On Tue, Oct 3, 2023 at 11:23 AM Matthias J. Sax <

mj...@apache.org>

wrote:



Thanks for the KIP Hanyu!


I took a quick look and it think the proposal makes sense

overall.


A few comments about how to structure the KIP.

As you propose to not add `ReverseRangQuery` class, the

code

example

should go into "Rejected Alternatives" section, not in the

"Proposed

Changes" section.

For the `RangeQuery` code example, please omit all existing

methods

etc,

and only include what will be added/changed. This make it

simpler to

read the KIP.


nit: typo


  the fault value is false


Should be "the default value is false".


Not sure if `setReverse()` is the best name. Maybe

`withDescandingOrder`

(or similar, I guess `withReverseOrder` would also work)

might be

better? Would be good to align to KIP-969 proposal that

suggest do use

`withDescendingKeys` methods for "reverse key-range"; if we

go with

`withReverseOrder` we should change KIP-969 accordingly.

Curious to hear what others think about naming this

consistently across

both KIPs.


-Matthias


On 10/3/23 9:17 AM, Hanyu (Peter) Zheng wrote:











https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2







--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<






https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image:

Slack]

<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<






https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic











--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<

https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic







--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<

https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic







--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<

https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic







--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<

https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]

[jira] [Commented] (KAFKA-15541) RocksDB Iterator Metrics

2023-10-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772072#comment-17772072
 ] 

Matthias J. Sax commented on KAFKA-15541:
-

Did not read the KIP yet, but sound very similar to 
https://issues.apache.org/jira/browse/KAFKA-15437 – should we close K15437 as 
duplicate?

> RocksDB Iterator Metrics
> 
>
> Key: KAFKA-15541
> URL: https://issues.apache.org/jira/browse/KAFKA-15541
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip, kip-required
>
> [KIP-989: RocksDB Iterator 
> Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics]
> RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due 
> to [blocks being "pinned" 
> in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators].
>  Pinned blocks can currently be tracked via the per-store 
> {{block-cache-pinned-usage}} metric. However, it's common [(and even 
> recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb]
>  to share the Block Cache among all stores in an application, to enable users 
> to globally bound native memory used by RocksDB. This results in the 
> {{block-cache-pinned-usage}} reporting the same memory usage for every store 
> in the application, irrespective of which store is actually pinning blocks in 
> the block cache.
> To aid users in finding leaked Iterators, as well as identifying the cause of 
> a high number of pinned blocks, we introduce two new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-03 Thread Matthias J. Sax

Thanks for the KIP Hanyu!


I took a quick look and it think the proposal makes sense overall.

A few comments about how to structure the KIP.

As you propose to not add `ReverseRangQuery` class, the code example 
should go into "Rejected Alternatives" section, not in the "Proposed 
Changes" section.


For the `RangeQuery` code example, please omit all existing methods etc, 
and only include what will be added/changed. This make it simpler to 
read the KIP.



nit: typo


 the fault value is false


Should be "the default value is false".


Not sure if `setReverse()` is the best name. Maybe `withDescandingOrder` 
(or similar, I guess `withReverseOrder` would also work) might be 
better? Would be good to align to KIP-969 proposal that suggest do use 
`withDescendingKeys` methods for "reverse key-range"; if we go with 
`withReverseOrder` we should change KIP-969 accordingly.


Curious to hear what others think about naming this consistently across 
both KIPs.



-Matthias


On 10/3/23 9:17 AM, Hanyu (Peter) Zheng wrote:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2



Re: [DISCUSS] KIP-969: Support range interactive queries for versioned state stores

2023-10-02 Thread Matthias J. Sax

Thanks for updating the KIP.

Not sure if I agree or not with Bruno's idea to split the query types 
further? In the end, we split them only because there is three different 
return types: single value, value-iterator, key-value-iterator.


What do we gain by splitting out single-ts-range-key? In the end, for 
range-ts-range-key the proposed class is necessary and is a superset 
(one can set both timestamps to the same value, for single-ts lookup).


The mentioned simplification might apply to "single-ts-range-key" but I 
don't see a simplification for the proposed (and necessary) query type?


On the other hand, I see an advantage of a single-ts-range-key for 
querying over the "latest version" with a range of keys. For a 
single-ts-range-key query, this it would be the default (similar to 
VersionedKeyQuery with not asOf-timestamped defined).


In the current version of the KIP, (if we agree that default should 
actually return "all versions" not "latest" -- this default was 
suggested by Bruno on KIP-968 and makes sense to me, so we would need to 
have the same default here to stay consistent), users would need to pass 
in `from(Long.MAX).to(Long.MAX)` (if I got this right) to query the 
latest point in time only, what seems to be clumsy? Or we could add a 
`lastestKeyOnly` option to `MultiVersionedRangeQuery`, but it does seems 
a little clumsy, too.





The overall order of the returned records is by Key


I assume, results are returned by timestamp for each key. The KIP should 
be explicit about it.




To be very explicit, should we rename the methods to specify the key bound?

 - withRange -> withKeyRange
 - withLowerBound -> withLowerKeyBound
 - withUpperBound -> withUpperKeyBound
 - withNoBounds -> allKeys (or withNoKeyBounds, but we use 
`allVersions` and not `noTimeBound` and should align the naming?)




-Matthias


On 9/6/23 5:25 AM, Bruno Cadonna wrote:

Hi Alieh,

Thanks for the KIP!

One high level comment/question:

I assume you separated single key queries into two classes because 
versioned key queries return a single value and multi version key 
queries return iterators. Although, range queries always return 
iterators, it would make sense to also separate range queries for 
versioned state stores into range queries that return one single version 
of the keys within a range and range queries that return multiple 
version of the keys within a range, IMO. That would reduce the 
meaningless combinations.

WDYT?

Best,
Bruno

On 8/16/23 8:01 PM, Alieh Saeedi wrote:

Hi all,

I splitted KIP-960

into
three separate KIPs. Therefore, please continue discussions about range
interactive queries here. You can see all the addressed reviews on the
following page. Thanks in advance.

KIP-969: Support range interactive queries (IQv2) for versioned state 
stores



I look forward to your feedback!

Cheers,
Alieh



Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-02 Thread Matthias J. Sax

Thanks for the updated KIP. Overall I like it.

Victoria raises a very good point, and I personally tend to prefer (I 
believe so does Victoria, but it's not totally clear from her email) if 
a range query would not return any tombstones, ie, only two records in 
Victoria's example. Thus, it seems best to include a `validTo` ts-field 
to `VersionedRecord` -- otherwise, the retrieved result cannot be 
interpreted correctly.


Not sure what others think about it.

I would also be open to actually add a `includeDeletes()` (or 
`includeTombstones()`) method/flag (disabled by default) to allow users 
to get all tombstone: this would only be helpful if there are two 
consecutive tombstone though (if I got it right), so not sure if we want 
to add it or not -- it seems also possible to add it later if there is 
user demand for it, so it might be a premature addition as this point?



Nit:

the public interface ValueIterator is used 


"is used" -> "is added" (otherwise it sounds like as if `ValueIterator` 
exist already)




Should we also add a `.within(fromTs, toTs)` (or maybe some better 
name?) to allow specifying both bounds at once? The existing 
`RangeQuery` does the same for specifying the key-range, so might be 
good to add for time-range too?




-Matthias


On 9/6/23 5:01 AM, Bruno Cadonna wrote:

In my last e-mail I missed to finish a sentence.

"I think from a KIP"

should be

"I think the KIP looks good!"


On 9/6/23 1:59 PM, Bruno Cadonna wrote:

Hi Alieh,

Thanks for the KIP!

I think from a KIP

1.
I propose to throw an IllegalArgumentException or an 
IllegalStateException for meaningless combinations. In any case, the 
KIP should specify what exception is thrown.


2.
Why does not specifying a range return the latest version? I would 
expect that it returns all versions since an empty lower or upper 
limit is interpreted as no limit.


3.
I second Matthias comment about replacing "asOf" with "until" or "to".

4.
Do we need "allVersions()"? As I said above I would return all 
versions if no limits are specified. I think if we get rid of 
allVersions() there might not be any meaningless combinations anymore.
If a user applies twice the same limit like for example 
MultiVersionedKeyQuery.with(key).from(t1).from(t2) the last one wins.


5.
Could you add some more examples with time ranges to the example section?

6.
The KIP misses the test plan section.

7.
I propose to rename the class to "MultiVersionKeyQuery" since we are 
querying multiple versions of the same key.


8.
Could you also add withAscendingTimestamps()? IMO it gives users the 
possibility to make their code more readable instead of only relying 
on the default.


Best,
Bruno


On 8/17/23 4:13 AM, Matthias J. Sax wrote:

Thanks for splitting this part into a separate KIP!

For `withKey()` we should be explicit that `null` is not allowed.

(Looking into existing `KeyQuery` it seems the JavaDocs don't cover 
this either -- would you like to do a tiny cleanup PR for this, or 
fix on-the-side in one of your PRs?)




The key query returns all the records that are valid in the time 
range starting from the timestamp {@code fromTimestamp}.


In the JavaDocs you use the phrase `are valid` -- I think we need to 
explain what "valid" means? It might even be worth to add some 
examples. It's annoying, but being precise if kinda important.


With regard to KIP-962, should we allow `null` for time bounds ? The 
JavaDocs should also be explicit if `null` is allowed or not and what 
the semantics are if allowed.




You are using `asOf()` however, because we are doing time-range 
queries, to me using `until()` to describe the upper bound would 
sound better (I am not a native speaker though, so maybe I am off?)



The key query returns all the records that have timestamp <= {@code 
asOfTimestamp}.


This is only correct if not lower-bound is set, right?


In your reply to KIP-960 you mentioned:


the meaningless combinations are prevented by throwing exceptions.


We should add corresponding JavaDocs like:

    @throws IllegalArgumentException if {@code fromTimestamp} is 
equal or

 larger than {@code untilTimestamp}

Or something similar.


With regard to KIP-960: if we need to introduce a `VersionedKeyQuery` 
class for single-key-single-ts lookup, would we need to find a new 
name for the query class of this KIP, given that the return type is 
different?



-Matthias



On 8/16/23 10:57 AM, Alieh Saeedi wrote:

Hi all,

I splitted KIP-960
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores>
into three separate KIPs. Therefore, please continue discussions
about single-key, multi-timestamp interactive queries here. You can 
see all

the addressed reviews on the following page. Thanks in adva

Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-10-02 Thread Matthias J. Sax
I did mean client side...  If KS goes into ERROR state, it should log 
the reason.


If the logs are indeed empty, try to register an 
uncaught-exception-handler via


KafkaStreamssetUncaughtExceptionHandler(...)


-Matthias

On 10/2/23 12:11 PM, Debraj Manna wrote:

Are you suggesting to check the Kafka broker logs? I do not see any other
errors logs on the client / application side.

On Fri, 29 Sep, 2023, 22:01 Matthias J. Sax,  wrote:


In general, Kafka Streams should keep running.

Can you inspect the logs to figure out why it's going into ERROR state
to begin with? Maybe you need to increase/change some timeouts/retries
configs.

The stack trace you shared, is a symptom, but not the root cause.

-Matthias

On 9/21/23 12:56 AM, Debraj Manna wrote:

I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
stream 3.5.1.

I am observing that whenever some rolling upgrade is done on AWS MSK our
stream application reaches an error state. I get the below exception on
trying to query the state store

caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is ERROR.
  at


org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)

  at


org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)

  at


org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)


Can someone let me know what the recommended way we can keep the stream
application running whenever some rolling upgrade/restart of brokers is
done in the background?







Re: [DISCUSS] KIP-960: Support interactive queries (IQv2) for versioned state stores

2023-10-02 Thread Matthias J. Sax
ted.

About defining new methods in the VersionedKeyValueStore interface: I

actually have defined the required methods in the RocksDBVersionedStore
class. Since defining them for the interface requires implementing them

for

all the classes that have implemented the interface.
Again a discussion for your other KIPs, but I think you'll want to

define

the new method(s) in the VersionedKeyValueStore interface directly

(rather

than only in individual implementations such as RocksDBVersionedStore),
otherwise your new interactive query types will throw NPEs for custom

store

implementations which do not support the new methods.
Best,VictoriaOn Thursday, August 17, 2023 at 07:25:22 AM EDT, Alieh
Saeedi  wrote:

   Hey Matthias,
thanks for the feedback

I think if one materializes a versioned store, then the query is posed

to

the versioned state store. So the type of materialized store determines

the

type of store and consequently all the classes for running the query

(for

example, MeteredVersionedKeyValueStore instead of MeteredKeyValueStore

and

so on). I added the piece of code for defining the versioned state

store to

the example part of the KIP-960.

About the generics, using VersionedRecord instead of V worked. Right
now, I am composing the integration tests. Let me complete the code and
confirm it for 100%.

About the KeyQuery class, I thought the KIP must contain just the newly
added stuff. OK, I will paste the whole class in KIP-960.

Thanks,
Alieh




On Thu, Aug 17, 2023 at 3:54 AM Matthias J. Sax 

wrote:



Thanks for updating the KIP and splitting into multiple ones. I am just
going to reply for the single-key-single-timestamp case below.

It seems the `KeyQuery.java` code snipped is "incomplete" -- the class
definition is missing.

At the same time, the example uses `VersionedKeyQuery` so I am not sure
right now if you propose to re-use the existing `KeyQuery` class or
introduce a new `VersionedKeyQuery` class?

While it was suggested that we re-use the existing `KeyQuery` class, I
am wondering what would happen if one uses the new `asOf` method, and
passes the query into a non-versioned store?

In the end, a non-versioned store does not know that there is an as-of
timestamp set and thus might just do a plain lookup (it also only has a
single value per key) and return whatever value it has stored?

I am wondering if this would be semantically questionable and/or
confusing for users (especially for timestamped stores)? -- Because the
non-versioned store does not know anything about the timestamp, it can
also not even check if it's set and raise an error.


Did you try to prototype any of both approaches? Asking because I am
wondering about generics and return types? Existing `KeyQuery` is

defined

as

`KeyQuery extends Query` so `V` is the result type.

However for the versioned-store we want the result type to be
`VersionedRecord` and thus we would need to set `V =
VersionedRecord` -- would this work or would the compiler tip over

it

(or would it work but still be confusing/complex for users to specify
the right types)?

For `VersionedKeyQuery` we could do:

`VersionedKeyQuery extends Query>`

what seems cleaner?

Without writing code I always have a hard time to reason about

generics,

so maybe trying out both approaches might shed some light?




-Matthias


On 8/15/23 9:03 AM, Alieh Saeedi wrote:

Hi all,
thanks to all for the great points you mentioned.

Addressed reviews are listed as follows:
1. The methods are defined as composable, as Lucas suggested. Now we

have

even more types of single-key_multi-timestamp queries. As Matthias
suggested in his first review, now with composable methods, queries

with

a

lower time bound are also possible. The meaningless combinations are
prevented by throwing exceptions.
2. I corrected and replaced asOf everywhere instead of until. I hope

the

javadocs and the explanations in the KIPs are clear enough about the

time

range. Matthias, Lucas, and Victoria asked about the exact time

boundaries.

I assumed that if the time range is specified as [t1, t2], all the

records

that have been inserted within this time range must be returned by the
query. But I think the point that all of you referred to and that

Victoria

clarified very well is valid. Maybe the query must return "all the
records that are valid within the time range". Therefore, records that

have

been inserted before t1 are also retuned. Now, this makes more sense

to

me

as a user. By the way, it seems more like a product question.
3. About the order of retuned records, I added some boolean fields to

the

classes to specify them. I still do not have any clue how hard the
implementation of this will be. The question is, is the order

considered

for normal range queries as well?
4. As Victoria pointed out the issue about listing tombstones, I

changed

the VersionedRecord such that it can have NULL values as well. The

question

is, what was 

Re: [DISCUSS] KIP-714: Client metrics and observability

2023-10-02 Thread Matthias J. Sax

Hi,

I did not pay attention to this KIP in the past; seems it was on-hold 
for a while.


Overall it sounds very useful, and I think we should extend this with a 
follow up KIP for Kafka Streams. What is unclear to me at this point is 
the statement:



Kafka Streams applications have an application.id configured and this 
identifier should be included as the application_id metrics label.


The `application.id` is currently only used as the (main) consumer's 
`group.id` (and is part of an auto-generated `client.id` if the user 
does not set one).


This comment related to:


The following labels should be added by the client as appropriate before 
metrics are pushed.


Given that Kafka Streams uses the consumer/producer/admin client as 
"black boxes", a client does at this point not know that it's part of a 
Kafka Streams application, and thus, it won't be able to attach any such 
label to the metrics it sends. (Also producer and admin don't even know 
the value of `application.id` -- only the (main) consumer, indirectly 
via `group.id`, but also restore and global consumer don't know it, 
because they don't have `group.id` set).


While I am totally in favor of the proposal, I am wondering how we 
intent to implement it in clean way? Or would we do ok to have some 
internal client APIs that KS can use to "register" itself with the client?





While clients must support both temporalities, the broker will initially only 
send GetTelemetrySubscriptionsResponse.DeltaTemporality=True


Not sure if I can follow. How make the decision about DELTA or 
CUMULATIVE metrics? Should the broker side plugin not decide what 
metrics it what to receive in which form? So what does "initially" mean 
-- the broker won't ship with a default plugin implementation?





The following method is added to the Producer, Consumer, and Admin client 
interfaces:


Should we add anything to Kafka Streams to expose the underlying 
clients' assigned client-instance-ids programmatically? I am also 
wondering if clients should report their assigned client-instance-ids as 
metrics itself (for this case, Kafka Streams won't need to do anything, 
because we already expose all client metrics).


If we add anything programmatic, we need to make it simple, given that 
Kafka Streams has many clients per `StreamThread` and may have multiple 
threads.





enable.metrics.push
It might be worth to add this to `StreamsConfig`, too? It set via 
StreamsConfig, we would forward it to all clients automatically.





-Matthias


On 9/29/23 5:45 PM, David Jacot wrote:

Hi Andrew,

Thanks for driving this one. I haven't read all the KIP yet but I already
have an initial question. In the Threading section, it is written
"KafkaConsumer: the "background" thread (based on the consumer threading
refactor which is underway)". If I understand this correctly, it means
that KIP-714 won't work if the "old consumer" is used. Am I correct?

Cheers,
David


On Fri, Sep 22, 2023 at 12:18 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Philip,
No, I do not think it should actively search for a broker that supports
the new
RPCs. In general, either all of the brokers or none of the brokers will
support it.
In the window, where the cluster is being upgraded or client telemetry is
being
enabled, there might be a mixed situation. I wouldn’t put too much effort
into
this mixed scenario. As the client finds brokers which support the new
RPCs,
it can begin to follow the KIP-714 mechanism.

Thanks,
Andrew


On 22 Sep 2023, at 20:01, Philip Nee  wrote:

Hi Andrew -

Question on top of your answers: Do you think the client should actively
search for a broker that supports this RPC? As previously mentioned, the
broker uses the leastLoadedNode to find its first connection (am
I correct?), and what if that broker doesn't support the metric push?

P

On Fri, Sep 22, 2023 at 10:20 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Kirk,
Thanks for your question. You are correct that the presence or absence

of

the new RPCs in the
ApiVersionsResponse tells the client whether to request the telemetry
subscriptions and push
metrics.

This is of course tricky in practice. It would be conceivable, as a
cluster is upgraded to AK 3.7
or as a client metrics receiver plugin is deployed across the cluster,
that a client connects to some
brokers that support the new RPCs and some that do not.

Here’s my suggestion:
* If a client is not connected to any brokers that support in the new
RPCs, it cannot push metrics.
* If a client is only connected to brokers that support the new RPCs, it
will use the new RPCs in
accordance with the KIP.
* If a client is connected to some brokers that support the new RPCs and
some that do not, it will
use the new RPCs with the supporting subset of brokers in accordance

with

the KIP.

Comments?

Thanks,
Andrew


On 22 Sep 2023, at 16:01, Kirk True  wrote:

Hi Andrew/Jun,

I want to make sure I understand question/comment 

[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15527:

Labels: kip  (was: )

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> Add reverseRange and reverseAll query over kv-store in IQv2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15527:

Component/s: streams

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Add reverseRange and reverseAll query over kv-store in IQv2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15520) Kafka Streams Stateful Aggregation Rebalancing causing processing to pause on all partitions

2023-09-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770539#comment-17770539
 ] 

Matthias J. Sax commented on KAFKA-15520:
-

{quote}However, I'm seeing that when pods restart - it triggers rebalances and 
causes processing to be paused on all pods till the rebalance and state restore 
is in progress.a
{quote}
and
{quote}I have increased session timeout to 480 seconds
{quote}
How long does a pod restart take? It is quicker than 480 seconds? – If yes, no 
rebalance should be triggered. (`max.poll.interval.ms` config would not make a 
difference for this case)
{quote}My understanding is that even if there is a rebalance - only the 
partitions that should be moved around will be restored in a cooperative way 
and not pause all the processing.
{quote}
That's right, however, in KS if a thread gets a new task assigned for which 
state must be restored, KS expliclity pauses processing for all other task to 
put all work into restoring to reduce restore latency (in some version of KS we 
tried to interleave processing of active task plus restoring, but there was 
complaints that it slows down restoring too much – with the new "state updated 
thread" we are adding, we aim to allow processing and restoring to happen in 
parallel again in future versions – maybe 3.7 if we can get it over the finish 
line).
{quote}Also, it should failover to standby replica in this case and avoid state 
restoring on other pods.
{quote}
Yes. Not sure why this does not happen, but you are using a somewhat older 
version of Kafka Streams – we put a lot of work into fixing bugs to this end, 
so it would be best to upgrade to 3.5 to see if the issues are already fiexed.

> Kafka Streams Stateful Aggregation Rebalancing causing processing to pause on 
> all partitions
> 
>
> Key: KAFKA-15520
> URL: https://issues.apache.org/jira/browse/KAFKA-15520
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Rohit Bobade
>Priority: Major
>
> Kafka broker version: 2.8.0 Kafka Streams client version: 2.6.2
> I am running kafka streams stateful aggregations on K8s statefulset with 
> persistent volume attached to each pod. I have also specified
> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, podName);
> which makes sure it gets the sticky partition assignment.
> Enabled standby replica - 
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> and set props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "0");
> However, I'm seeing that when pods restart - it triggers rebalances and 
> causes processing to be paused on all pods till the rebalance and state 
> restore is in progress.
> My understanding is that even if there is a rebalance - only the partitions 
> that should be moved around will be restored in a cooperative way and not 
> pause all the processing. Also, it should failover to standby replica in this 
> case and avoid state restoring on other pods.
> I have increased session timeout to 480 seconds and max poll interval to 15 
> mins to minimize rebalances.
> Also added
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> CooperativeStickyAssignor.class.getName());
> to enable CooperativeStickyAssignor
> could someone please help if I'm missing something?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15491) RackId doesn't exist error while running WordCountDemo

2023-09-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15491.
-
Fix Version/s: 3.6.1
   3.7.0
 Assignee: Hao Li
   Resolution: Fixed

> RackId doesn't exist error while running WordCountDemo
> --
>
> Key: KAFKA-15491
> URL: https://issues.apache.org/jira/browse/KAFKA-15491
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Luke Chen
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.6.1, 3.7.0
>
>
> While running the WordCountDemo following the 
> [docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the 
> following error logs in the stream application output. Though everything 
> still works fine, it'd be better there are no ERROR logs in the demo app.
> {code:java}
> [2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> [2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15491) RackId doesn't exist error while running WordCountDemo

2023-09-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15491.
-
Fix Version/s: 3.6.1
   3.7.0
 Assignee: Hao Li
   Resolution: Fixed

> RackId doesn't exist error while running WordCountDemo
> --
>
> Key: KAFKA-15491
> URL: https://issues.apache.org/jira/browse/KAFKA-15491
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Luke Chen
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.6.1, 3.7.0
>
>
> While running the WordCountDemo following the 
> [docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the 
> following error logs in the stream application output. Though everything 
> still works fine, it'd be better there are no ERROR logs in the demo app.
> {code:java}
> [2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> [2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Kafka PMC Member: Justine Olshan

2023-09-29 Thread Matthias J. Sax

Congrats!

On 9/25/23 7:29 AM, Rajini Sivaram wrote:

Congratulations, Justine!

Regards,

Rajini

On Mon, Sep 25, 2023 at 9:40 AM Lucas Brutschy
 wrote:


Congrats, Justine!

On Mon, Sep 25, 2023 at 9:20 AM Bruno Cadonna  wrote:


Congrats, Justine! Well deserved!

Best,
Bruno

On 9/25/23 5:28 AM, ziming deng wrote:

Congratulations Justine!



On Sep 25, 2023, at 00:01, Viktor Somogyi-Vass <

viktor.somo...@cloudera.com.INVALID> wrote:


Congrats Justine!

On Sun, Sep 24, 2023, 17:45 Kirk True  wrote:


Congratulations Justine! Thanks for all your great work!


On Sep 24, 2023, at 8:37 AM, John Roesler 

wrote:


Congratulations, Justine!
-John

On Sun, Sep 24, 2023, at 05:05, Mickael Maison wrote:

Congratulations Justine!

On Sun, Sep 24, 2023 at 5:04 AM Sophie Blee-Goldman
 wrote:


Congrats Justine!

On Sat, Sep 23, 2023, 4:36 PM Tom Bentley 

wrote:



Congratulations!

On Sun, 24 Sept 2023 at 12:32, Satish Duggana <

satish.dugg...@gmail.com>

wrote:


Congratulations Justine!!

On Sat, 23 Sept 2023 at 15:46, Bill Bejeck 

wrote:


Congrats Justine!

-Bill

On Sat, Sep 23, 2023 at 6:23 PM Greg Harris



wrote:


Congratulations Justine!

On Sat, Sep 23, 2023 at 5:49 AM Boudjelda Mohamed Said
 wrote:


Congrats Justin !

On Sat 23 Sep 2023 at 14:44, Randall Hauch 


wrote:



Congratulations, Justine!

On Sat, Sep 23, 2023 at 4:25 AM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:


Congrats Justine!

On Sat, Sep 23, 2023, 13:28 Divij Vaidya <

divijvaidy...@gmail.com>

wrote:



Congratulations Justine!

On Sat 23. Sep 2023 at 07:06, Chris Egerton <

fearthecel...@gmail.com>

wrote:


Congrats Justine!
On Fri, Sep 22, 2023, 20:47 Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Congratulations!

On Fri, Sep 22, 2023 at 8:44 PM Tzu-Li (Gordon) Tai <

tzuli...@apache.org


wrote:


Congratulations Justine!

On Fri, Sep 22, 2023, 19:25 Philip Nee <

philip...@gmail.com>

wrote:



Congrats Justine!

On Fri, Sep 22, 2023 at 7:07 PM Luke Chen <

show...@gmail.com>

wrote:



Hi, Everyone,

Justine Olshan has been a Kafka committer since

Dec.

2022.

She

has

been

very active and instrumental to the community since

becoming

a

committer.

It's my pleasure to announce that Justine is now a

member of

Kafka

PMC.


Congratulations Justine!

Luke
on behalf of Apache Kafka PMC































Re: [ANNOUNCE] New committer: Yash Mayya

2023-09-29 Thread Matthias J. Sax

Congrats!

On 9/26/23 1:30 AM, Mayank Shekhar Narula wrote:

Congrats Yash!

On Fri, Sep 22, 2023 at 5:24 PM Mickael Maison 
wrote:


Congratulations Yash!

On Fri, Sep 22, 2023 at 9:25 AM Chaitanya Mukka
 wrote:


Congrats, Yash!! Well deserved.

Chaitanya Mukka
On 21 Sep 2023 at 8:58 PM +0530, Bruno Cadonna ,

wrote:

Hi all,

The PMC of Apache Kafka is pleased to announce a new Kafka committer
Yash Mayya.

Yash's major contributions are around Connect.

Yash authored the following KIPs:

KIP-793: Allow sink connectors to be used with topic-mutating SMTs
KIP-882: Kafka Connect REST API configuration validation timeout
improvements
KIP-970: Deprecate and remove Connect's redundant task configurations
endpoint
KIP-980: Allow creating connectors in a stopped state

Overall, Yash is known for insightful and friendly input to discussions
and his high quality contributions.

Congratulations, Yash!

Thanks,

Bruno (on behalf of the Apache Kafka PMC)







Re: [ANNOUNCE] New committer: Lucas Brutschy

2023-09-29 Thread Matthias J. Sax

Congrats!

On 9/26/23 1:29 AM, Mayank Shekhar Narula wrote:

Congratulations Lucas!

On Fri, Sep 22, 2023 at 5:24 PM Mickael Maison 
wrote:


Congratulations Lucas!

On Fri, Sep 22, 2023 at 7:13 AM Luke Chen  wrote:


Congratulations, Lukas!

Luke

On Fri, Sep 22, 2023 at 6:53 AM Tom Bentley  wrote:


Congratulations!

On Fri, 22 Sept 2023 at 09:11, Sophie Blee-Goldman <

ableegold...@gmail.com



wrote:


Congrats Lucas!










Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-09-29 Thread Matthias J. Sax

In general, Kafka Streams should keep running.

Can you inspect the logs to figure out why it's going into ERROR state 
to begin with? Maybe you need to increase/change some timeouts/retries 
configs.


The stack trace you shared, is a symptom, but not the root cause.

-Matthias

On 9/21/23 12:56 AM, Debraj Manna wrote:

I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
stream 3.5.1.

I am observing that whenever some rolling upgrade is done on AWS MSK our
stream application reaches an error state. I get the below exception on
trying to query the state store

caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is ERROR.
 at
org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
 at
org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
 at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)

Can someone let me know what the recommended way we can keep the stream
application running whenever some rolling upgrade/restart of brokers is
done in the background?



Re: Can a message avoid loss occur in Kafka

2023-09-29 Thread Matthias J. Sax
For the config you provide, data loss should not happen (as long as you 
don't allow for unclean leader election, which is disabled by default).


But you might be subject to unavailability for some partitions if a 
broker fails.



-Matthias

On 9/17/23 7:49 AM, 陈近南 wrote:

Hello,
Can a message avoid loss occur in Kafka. For example, my config is:


Producer
retries = Integer.MAX_VALUE
request.required.acks=-1


Broker
replication.factor >= 2
min.insync.replicas > 1
log.flush.interval.messages=1


Consumer
enable.auto.commit = false

  Can it avoid loss message occur in Kafka, if can not,  why? and does exist other MQ can do avoid?



Best regards,
Chen



[jira] [Updated] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-09-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15463:

Priority: Major  (was: Blocker)

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: FW: UNSUBSCRIBE

2023-09-12 Thread Matthias J. Sax

To unsubscribe, you need to send an email to

  users-unsubscr...@storm.apache.org.

Bests,
  -Matthias


On 9/12/23 6:37 AM, hmm0403 wrote:


UNSUBSCRIBE


내 Galaxy에서 보냄


 원본 이메일 
발신: Michele Volpe 
날짜: 23/9/12 오후 10:26 (GMT+09:00)
받은 사람: user@storm.apache.org
제목: Re: UNSUBSCRIBE

UNSUBSCRIBE

Il mar 12 set 2023, 13:05 yuz989 > ha scritto:





[jira] [Created] (KAFKA-15443) Upgrade RocksDB dependency

2023-09-07 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15443:
---

 Summary: Upgrade RocksDB dependency
 Key: KAFKA-15443
 URL: https://issues.apache.org/jira/browse/KAFKA-15443
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams currently depends on RocksDB 7.9.2

However, the latest version of RocksDB is already 8.5.3. We should check the 
RocksDB release notes to see what benefits we get to upgrade to the latest 
version (and file corresponding tickets to exploit improvement of newer 
releases as applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15443) Upgrade RocksDB dependency

2023-09-07 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15443:
---

 Summary: Upgrade RocksDB dependency
 Key: KAFKA-15443
 URL: https://issues.apache.org/jira/browse/KAFKA-15443
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams currently depends on RocksDB 7.9.2

However, the latest version of RocksDB is already 8.5.3. We should check the 
RocksDB release notes to see what benefits we get to upgrade to the latest 
version (and file corresponding tickets to exploit improvement of newer 
releases as applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15417:
---

Assignee: Victor van den Hoven

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Assignee: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    2   3   4   5   6   7   8   9   10   11   >