[jira] [Resolved] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15170.

Resolution: Fixed

> CooperativeStickyAssignor cannot adjust assignment correctly
> 
>
> Key: KAFKA-15170
> URL: https://issues.apache.org/jira/browse/KAFKA-15170
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment 
> when all consumers in group subscribe the same topic list, but it couldn't 
> add all partitions move owner to another consumer to 
> ``partitionsWithMultiplePreviousOwners``.
>  
> the reason is in function assignOwnedPartitions hasn't add partitions that 
> rack-mismatch with prev owner to allRevokedPartitions, then partition only in 
> this list would add to partitionsWithMultiplePreviousOwners.
>  
> In Cooperative Rebalance, partitions have changed owner must be removed from 
> final assignment or will lead to incorrect consume behavior, I have already 
> raise a pr, please take a look, thx
>  
> [https://github.com/apache/kafka/pull/13965]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-05-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16758:
--

 Summary: Extend Consumer#close with option to leave the group or 
not
 Key: KAFKA-16758
 URL: https://issues.apache.org/jira/browse/KAFKA-16758
 Project: Kafka
  Issue Type: New Feature
  Components: consumer
Reporter: A. Sophie Blee-Goldman


See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the full 
context.

Essentially we would get rid of the "internal.leave.group.on.close" config that 
is used as a backdoor by Kafka Streams right now to prevent closed consumers 
from leaving the group, thus reducing unnecessary task movements after a simple 
bounce. 

This would be replaced by an actual public API that would allow the caller to 
opt in or out to the LeaveGroup when close is called. This would be similar to 
the KafkaStreams#close(CloseOptions) API, and in fact would be how that API 
will be implemented (since it only works for static groups at the moment as 
noted in KAFKA-16514 )

This has several benefits over the current situation:
 # It allows plain consumer apps to opt-out of leaving the group when closed, 
which is currently not possible through any public API (only an internal 
backdoor config)
 # It enables the caller to dynamically select the appropriate action depending 
on why the client is being closed – for example, you would not want the 
consumer to leave the group during a simple restart, but would want it to leave 
the group when shutting down the app or if scaling down the node. This is not 
possible today, even with the internal config, since configs are immutable
 # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
that the user's choice to leave the group during close will be respected for 
non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-16277.

Resolution: Fixed

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15782) Establish concrete project conventions to define public APIs that require a KIP

2023-11-02 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15782:
--

 Summary: Establish concrete project conventions to define public 
APIs that require a KIP
 Key: KAFKA-15782
 URL: https://issues.apache.org/jira/browse/KAFKA-15782
 Project: Kafka
  Issue Type: Improvement
Reporter: A. Sophie Blee-Goldman


There seems to be no concrete definition that establishes project-specific 
conventions for what is and is not considered a public API change that requires 
a KIP. This results in frequent drawn-out debates that revisit the same topic 
and slow things down, and often ends up forcing trivial changes through the KIP 
process. For a recent example, KIP-998 was required for a one-line change just 
to add the "protected" access modifier to an otherwise package-private class. 
See [this comment 
thread|https://github.com/apache/kafka/pull/14681#discussion_r1378591228] for 
the full debate on this subject.

It would be beneficial and in the long run save us all time to just sit down 
and hash out the project conventions, such as whether a 
package-private/protected method on a non-final java class is to be considered 
a public API, even if the method itself is/was never a public method. This will 
of course require a KIP, but should help to establish some ground rules to 
avoid any more superfluous KIPs in the future



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected

2023-11-02 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15781:
--

 Summary: Change ProducerConfig(props, doLog) constructor to 
protected
 Key: KAFKA-15781
 URL: https://issues.apache.org/jira/browse/KAFKA-15781
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman


See https://github.com/apache/kafka/pull/14681



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15116.

Resolution: Not A Problem

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12550.

Resolution: Won't Fix

Closing this out since it's usefulness is preempted by the StateUpdaterThread 
and having moved restoration out of the main StreamThread

> Introduce RESTORING state to the KafkaStreams FSM
> -
>
> Key: KAFKA-12550
> URL: https://issues.apache.org/jira/browse/KAFKA-12550
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> We should consider adding a new state to the KafkaStreams FSM: RESTORING
> This would cover the time between the completion of a stable rebalance and 
> the completion of restoration across the client. Currently, Streams will 
> report the state during this time as REBALANCING even though it is generally 
> spending much more time restoring than rebalancing in most cases.
> There are a few motivations/benefits behind this idea:
> # Observability is a big one: using the umbrella REBALANCING state to cover 
> all aspects of rebalancing -> task initialization -> restoring has been a 
> common source of confusion in the past. It’s also proved to be a time sink 
> for us, during escalations, incidents, mailing list questions, and bug 
> reports. It often adds latency to escalations in particular as we have to go 
> through GTS and wait for the customer to clarify whether their “Kafka Streams 
> is stuck rebalancing” ticket means that it’s literally rebalancing, or just 
> in the REBALANCING state and actually stuck elsewhere in Streams
> # Prereq for global thread improvements: for example [KIP-406: 
> GlobalStreamThread should honor custom reset policy 
> |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy]
>  was ultimately blocked on this as we needed to pause the Streams app while 
> the global thread restored from the appropriate offset. Since there’s 
> absolutely no rebalancing involved in this case, piggybacking on the 
> REBALANCING state would just be shooting ourselves in the foot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15463.

Resolution: Not A Problem

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended

2023-10-11 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15571.

Resolution: Fixed

> StateRestoreListener#onRestoreSuspended is never called because wrapper 
> DelegatingStateRestoreListener doesn't implement onRestoreSuspended
> ---
>
> Key: KAFKA-15571
> URL: https://issues.apache.org/jira/browse/KAFKA-15571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>
> With https://issues.apache.org/jira/browse/KAFKA-10575 
> `StateRestoreListener#onRestoreSuspended` was added. But local tests show 
> that it is never called because `DelegatingStateRestoreListener` was not 
> updated to call a new method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores

2023-07-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15215:
--

 Summary: The default.dsl.store config is not compatible with 
custom state stores
 Key: KAFKA-15215
 URL: https://issues.apache.org/jira/browse/KAFKA-15215
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman
Assignee: Almog Gavra


Sort of a bug, sort of a new/missing feature. When we added the long-awaited 
default.dsl.store config, it was decided to scope the initial KIP to just the 
two out-of-the-box state stores types offered by Streams, rocksdb and 
in-memory. The reason being that this would address a large number of the 
relevant use cases, and could always be followed up with another KIP for custom 
state stores if/when the demand arose.

Of course, since rocksdb is the default anyways, the only beneficiaries of this 
KIP right now are the people who specifically want only in-memory stores – yet 
custom state stores users are probably by far the ones with the greatest need 
for an easier way to configure the store type across an entire application. And 
unfortunately, because the config currently relies on enum definitions for the 
known OOTB store types, there's not really any way to extend this feature as it 
is to work with custom implementations.

I think this is a great feature, which is why I hope to see it extended to the 
broader user base. Most likely we'll want to introduce a new config for this, 
though whether it replaces the old default.dsl.store config or complements it 
will have to be decided during the KIP discussion



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15045) Move Streams task assignor to public configs

2023-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15045:
--

 Summary: Move Streams task assignor to public configs
 Key: KAFKA-15045
 URL: https://issues.apache.org/jira/browse/KAFKA-15045
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-05-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14976:
--

 Summary: Left/outer stream-stream joins create KV stores that 
aren't customizable
 Key: KAFKA-14976
 URL: https://issues.apache.org/jira/browse/KAFKA-14976
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


It appears that we only give the illusion of full customizability when it comes 
to the state stores of a windowed join. This arose due to an 
[optimization|https://github.com/apache/kafka/pull/11252] for the performance 
of the spurious results fix, and means that these joins now come with one 
additional, and possibly unexpected, state store:

 
{code:java}
final StoreBuilder, 
LeftOrRightValue>> builder =
            new ListValueStoreBuilder<>(
         |--[   persistent ? 
this-->  |         Stores.persistentKeyValueStore(storeName) : 
         |--[      Stores.inMemoryKeyValueStore(storeName),
                timestampedKeyAndJoinSideSerde,
                leftOrRightValueSerde,
                Time.SYSTEM
            ); {code}
 

where persistent is defined above that as
{code:java}
final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || 
streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
 
This means regardless of whether a custom state store implementation was passed 
in to the join, we will still insert one of our RocksDB or InMemory state 
stores. Which might be very surprising since the API makes it seem like the 
underlying stores are fully configurable.

I'm adding a warning line for this in PR 
[#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
 but we should really make this hidden state store fully configurable like the 
window stores currently are (which will require a KIP)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14650) IQv2 can throw ConcurrentModificationException when accessing Tasks

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14650:
--

 Summary: IQv2 can throw ConcurrentModificationException when 
accessing Tasks 
 Key: KAFKA-14650
 URL: https://issues.apache.org/jira/browse/KAFKA-14650
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.4.0
Reporter: A. Sophie Blee-Goldman


>From failure in *[PositionRestartIntegrationTest.verifyStore[cache=false, 
>log=true, supplier=IN_MEMORY_WINDOW, 
>kind=PAPI]|https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/63/testReport/junit/org.apache.kafka.streams.integration/PositionRestartIntegrationTest/Build___JDK_11_and_Scala_2_13___verifyStore_cache_false__log_true__supplier_IN_MEMORY_WINDOW__kind_PAPI_/]*
java.util.ConcurrentModificationException
at 
java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
at java.base/java.util.HashMap.putMapEntries(HashMap.java:508)
at java.base/java.util.HashMap.putAll(HashMap.java:781)
at 
org.apache.kafka.streams.processor.internals.Tasks.allTasksPerId(Tasks.java:361)
at 
org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1537)
at 
org.apache.kafka.streams.processor.internals.StreamThread.allTasks(StreamThread.java:1278)
at org.apache.kafka.streams.KafkaStreams.query(KafkaStreams.java:1921)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.iqv2WaitForResult(IntegrationTestUtils.java:168)
at 
org.apache.kafka.streams.integration.PositionRestartIntegrationTest.shouldReachExpectedPosition(PositionRestartIntegrationTest.java:438)
at 
org.apache.kafka.streams.integration.PositionRestartIntegrationTest.verifyStore(PositionRestartIntegrationTest.java:423)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13602) Allow to broadcast a result record

2022-12-29 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13602.

Resolution: Fixed

> Allow to broadcast a result record
> --
>
> Key: KAFKA-13602
> URL: https://issues.apache.org/jira/browse/KAFKA-13602
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip, newbie++
> Fix For: 3.4.0
>
>
> From time to time, users ask how they can send a record to more than one 
> partition in a sink topic. Currently, this is only possible by replicate the 
> message N times before the sink and use a custom partitioner to write the N 
> messages into the N different partitions.
> It might be worth to make this easier and add a new feature for it. There are 
> multiple options:
>  * extend `to()` / `addSink()` with a "broadcast" option/config
>  * add `toAllPartitions()` / `addBroadcastSink()` methods
>  * allow StreamPartitioner to return `-1` for "all partitions"
>  * extend `StreamPartitioner` to allow returning more than one partition (ie 
> a list/collection of integers instead of a single int)
> The first three options imply that a "full broadcast" is supported only, so 
> it's less flexible. On the other hand, it's easier to use (especially the 
> first two options are easy as they do not require to implement a custom 
> partitioner).
> The last option would be most flexible and also allow for a "partial 
> broadcast" (aka multi-cast) pattern. It might also be possible to combine two 
> options, or maye even a totally different idea.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2022-12-20 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14539:
--

 Summary: Simplify StreamsMetadataState by replacing the Cluster 
metadata with partition info map
 Key: KAFKA-14539
 URL: https://issues.apache.org/jira/browse/KAFKA-14539
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


We can clean up the StreamsMetadataState class a bit by removing the #onChange 
invocation that currently occurs within StreamsPartitionAssignor#assign, which 
then lets us remove the `Cluster` parameter in that callback. Instead of 
building a fake Cluster object from the map of partition info when we invoke 
#onChange inside the StreamsPartitionAssignor#onAssignment method, we can just 
directly pass in the  `Map` and replace the 
usage of `Cluster` everywhere in StreamsMetadataState

(I believe the current system is a historical artifact from when we used to 
require passing in a {{Cluster}} for the default partitioning strategy, which 
the StreamMetadataState needs to compute the partition for a key. At some point 
in the past we provided a better way to get the default partition, so we no 
longer need a {{Cluster}} parameter/field at all)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13602) Allow to broadcast a result record

2022-12-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-13602:


Reverting in 3.4 due to logging-related perf/security issue, and some open 
semantic questions. Retargeting for 3.5

> Allow to broadcast a result record
> --
>
> Key: KAFKA-13602
> URL: https://issues.apache.org/jira/browse/KAFKA-13602
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip, newbie++
> Fix For: 3.4.0
>
>
> From time to time, users ask how they can send a record to more than one 
> partition in a sink topic. Currently, this is only possible by replicate the 
> message N times before the sink and use a custom partitioner to write the N 
> messages into the N different partitions.
> It might be worth to make this easier and add a new feature for it. There are 
> multiple options:
>  * extend `to()` / `addSink()` with a "broadcast" option/config
>  * add `toAllPartitions()` / `addBroadcastSink()` methods
>  * allow StreamPartitioner to return `-1` for "all partitions"
>  * extend `StreamPartitioner` to allow returning more than one partition (ie 
> a list/collection of integers instead of a single int)
> The first three options imply that a "full broadcast" is supported only, so 
> it's less flexible. On the other hand, it's easier to use (especially the 
> first two options are easy as they do not require to implement a custom 
> partitioner).
> The last option would be most flexible and also allow for a "partial 
> broadcast" (aka multi-cast) pattern. It might also be possible to combine two 
> options, or maye even a totally different idea.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-12-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13736.

Resolution: Fixed

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
>  Labels: flakey, flaky-test
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2022-12-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-4852.
---
Resolution: Fixed

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Assignee: LinShunkang
>Priority: Minor
> Fix For: 3.4.0
>
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode

2022-12-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14291.

Resolution: Duplicate

Marking this as a duplicate of KAFKA-13990 as described above

> KRaft: ApiVersionsResponse doesn't have finalizedFeatures and 
> finalizedFeatureEpoch in KRaft mode
> -
>
> Key: KAFKA-14291
> URL: https://issues.apache.org/jira/browse/KAFKA-14291
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Akhilesh Chaganti
>Priority: Critical
>
> https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53
> ```
> class SimpleApiVersionManager(
>   val listenerType: ListenerType,
>   val enabledApis: collection.Set[ApiKeys],
>   brokerFeatures: Features[SupportedVersionRange]
> ) extends ApiVersionManager {
>   def this(listenerType: ListenerType) = {
> this(listenerType, ApiKeys.apisForListener(listenerType).asScala, 
> BrokerFeatures.defaultSupportedFeatures())
>   }
>   private val apiVersions = 
> ApiVersionsResponse.collectApis(enabledApis.asJava)
>   override def apiVersionResponse(requestThrottleMs: Int): 
> ApiVersionsResponse = {
> ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
> apiVersions, brokerFeatures)
>   }
> }
> ```
> ApiVersionManager for KRaft doesn't add the finalizedFeatures and 
> finalizedFeatureEpoch to the ApiVersionsResponse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14460) In-memory store iterators can return results with null values

2022-12-09 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14460:
--

 Summary: In-memory store iterators can return results with null 
values
 Key: KAFKA-14460
 URL: https://issues.apache.org/jira/browse/KAFKA-14460
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


Due to the thread-safety model we adopted in our in-memory stores to avoid 
scaling issues, we synchronize all read/write methods and then during range 
scans, copy the keyset of all results rather than returning a direct iterator 
over the underlying map. When users call #next to read out the iterator 
results, we issue a point lookup on the next key and then simply return a new 
KeyValue<>(key, get(key))

This lets the range scan return results without blocking access to the store by 
other threads and without risk of ConcurrentModification, as a writer can 
modify the real store without affecting the keyset copy of the iterator. This 
also means that those changes won't be reflected in what the iterator sees or 
returns, which in itself is fine as we don't guarantee consistency semantics of 
any kind.

However, we _do_ guarantee that range scans "must not return null values" – and 
this contract may be violated if the StreamThread deletes a record that the 
iterator was going to return.

tl;dr we should check get(key) for null and skip to the next result if 
necessary in the in-memory store iterators. See for example 
InMemoryKeyValueIterator (note that we'll probably need to buffer one record in 
advance before we return true from #hasNext)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14459) Document how to use and close a 'Statistics' in the example RocksDBConfigSetter

2022-12-09 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14459:
--

 Summary: Document how to use and close a 'Statistics' in the 
example RocksDBConfigSetter
 Key: KAFKA-14459
 URL: https://issues.apache.org/jira/browse/KAFKA-14459
 Project: Kafka
  Issue Type: Improvement
  Components: docs, streams
Reporter: A. Sophie Blee-Goldman


We fixed a memory leak in KAFKA-14432 where we were sometimes failing to close 
the `Statistics` object used for rocksdb metrics. Since users can define their 
own Statistics as well, we should make sure they know that this has to be 
closed like we do for other `RocksDBObject` classes like the Cache. It might 
also be useful to provide an example of how to use Statistics and what can be 
done with it.

 

We currently have two sample RocksDBConfigSetter implementations in the docs, 
both of which could be updated here:
 # [rocksdb memory management 
docs](https://kafka.apache.org/documentation/streams/developer-guide/memory-mgmt.html#rocksdb)
 -- consider including a Statistics in this example to highlight that it needs 
to be closed? This one could arguably be skipped, although the formatting of 
this sample config setter seems to be messed up so this might be a good 
opportunity to fix that on the side
 # [rocksdb.config.setter config 
docs](https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id20):
 this would be a good place to include an example that actually uses the 
Statistics for something (assuming there's some reason for users to define 
their own Statistics in the first place, which I personally do not know). We 
can potentially link to this example from the metrics docs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individu

2022-12-09 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14454.

Resolution: Fixed

> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  passes when run individually but not when is run as part of the IT
> --
>
> Key: KAFKA-14454
> URL: https://issues.apache.org/jira/browse/KAFKA-14454
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Newly added test 
> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  as part of KIP-837 passes when run individually but fails when is part of IT 
> class and hence is marked as Ignored. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14432) RocksDBStore relies on finalizers to not leak memory

2022-12-07 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14432.

Resolution: Fixed

> RocksDBStore relies on finalizers to not leak memory
> 
>
> Key: KAFKA-14432
> URL: https://issues.apache.org/jira/browse/KAFKA-14432
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Blocker
> Fix For: 3.4.0
>
>
> Relying on finalizers in RocksDB has been deprecated for a long time, and 
> starting with rocksdb 7, finalizers are removed completely (see 
> [https://github.com/facebook/rocksdb/pull/9523]). 
> Kafka Streams currently relies on finalizers in parts to not leak memory. 
> This needs to be resolved before we can upgrade to RocksDB 7.
> See  [https://github.com/apache/kafka/pull/12809] .
> This is a native heap profile after running Kafka Streams without finalizers 
> for a few hours:
> {code:java}
> Total: 13547.5 MB
> 12936.3 95.5% 95.5% 12936.3 95.5% rocksdb::port::cacheline_aligned_alloc
> 438.5 3.2% 98.7% 438.5 3.2% rocksdb::BlockFetcher::ReadBlockContents
> 84.0 0.6% 99.3% 84.2 0.6% rocksdb::Arena::AllocateNewBlock
> 45.9 0.3% 99.7% 45.9 0.3% prof_backtrace_impl
> 8.1 0.1% 99.7% 14.6 0.1% rocksdb::BlockBasedTable::PutDataBlockToCache
> 6.4 0.0% 99.8% 12941.4 95.5% Java_org_rocksdb_Statistics_newStatistics___3BJ
> 6.1 0.0% 99.8% 6.9 0.1% rocksdb::LRUCacheShard::Insert@2d8b20
> 5.1 0.0% 99.9% 6.5 0.0% rocksdb::VersionSet::ProcessManifestWrites
> 3.9 0.0% 99.9% 3.9 0.0% rocksdb::WritableFileWriter::WritableFileWriter
> 3.2 0.0% 99.9% 3.2 0.0% std::string::_Rep::_S_create{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14260) InMemoryKeyValueStore iterator still throws ConcurrentModificationException

2022-12-06 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14260.

Resolution: Fixed

> InMemoryKeyValueStore iterator still throws ConcurrentModificationException
> ---
>
> Key: KAFKA-14260
> URL: https://issues.apache.org/jira/browse/KAFKA-14260
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 3.2.3
>Reporter: Avi Cherry
>Assignee: Lucia Cerchie
>Priority: Major
> Fix For: 3.4.0
>
>
> This is the same bug as KAFKA-7912 which was then re-introduced by KAFKA-8802.
> Any iterator returned from {{InMemoryKeyValueStore}} may end up throwing a 
> ConcurrentModificationException because the backing map is not concurrent 
> safe. I expect that this only happens when the store is retrieved from 
> {{KafkaStreams.store()}} from outside of the topology since any usage of the 
> store from inside of the topology should be naturally single-threaded.
> To start off, a reminder that this behaviour explicitly violates the 
> interface contract for {{ReadOnlyKeyValueStore}} which states
> {quote}The returned iterator must be safe from 
> java.util.ConcurrentModificationExceptions
> {quote}
> It is often complicated to make code to demonstrate concurrency bugs, but 
> thankfully it is trivial to reason through the source code in 
> {{InMemoryKeyValueStore.java}} to show why this happens:
>  * All of the InMemoryKeyValueStore methods that return iterators do so by 
> passing a keySet based on the backing TreeMap to the InMemoryKeyValueIterator 
> constructor.
>  * These keySets are all VIEWS of the backing map, not copies.
>  * The InMemoryKeyValueIterator then makes a private copy of the keySet by 
> passing the original keySet into the constructor for TreeSet. This copying 
> was implemented in KAFKA-8802, incorrectly intending it to fix the 
> concurrency problem.
>  * TreeSet then iterates over the keySet to make a copy. If the original 
> backing TreeMap in InMemoryKeyValueStore is changed while this copy is being 
> created it will fail-fast a ConcurrentModificationException.
> This bug should be able to be trivially fixed by replacing the backing 
> TreeMap with a ConcurrentSkipListMap but here's the rub:
> This bug has already been found in KAFKA-7912 and the TreeMap was replaced 
> with a ConcurrentSkipListMap. It was then reverted back to a TreeMap in 
> KAFKA-8802 because of the performance regression. I can [see from one of the 
> PRs|https://github.com/apache/kafka/pull/7212/commits/384c12e40f3a59591f897d916f92253e126820ed]
>  that it was believed the concurrency problem with the TreeMap implementation 
> was fixed by copying the keyset when the iterator is created but the problem 
> remains, plus the fix creates an extra copy of the iterated portion of the 
> set in memory.
> For what it's worth, the performance difference between TreeMap and 
> ConcurrentSkipListMap do not extend into complexity. TreeMap enjoys a similar 
> ~2x speed through all operations with any size of data, but at the cost of 
> what turned out to be an easy-to-encounter bug.
> This is all unfortunate since the only time the state stores ever get 
> accessed concurrently is through the `KafkaStreams.store()` mechanism, but I 
> would imagine that "correct and slightly slower) is better than "incorrect 
> and faster".
> Too bad BoilerBay's AirConcurrentMap is closed-source and patented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13602) Allow to broadcast a result record

2022-12-06 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13602.

Resolution: Fixed

> Allow to broadcast a result record
> --
>
> Key: KAFKA-13602
> URL: https://issues.apache.org/jira/browse/KAFKA-13602
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip, newbie++
> Fix For: 3.4.0
>
>
> From time to time, users ask how they can send a record to more than one 
> partition in a sink topic. Currently, this is only possible by replicate the 
> message N times before the sink and use a custom partitioner to write the N 
> messages into the N different partitions.
> It might be worth to make this easier and add a new feature for it. There are 
> multiple options:
>  * extend `to()` / `addSink()` with a "broadcast" option/config
>  * add `toAllPartitions()` / `addBroadcastSink()` methods
>  * allow StreamPartitioner to return `-1` for "all partitions"
>  * extend `StreamPartitioner` to allow returning more than one partition (ie 
> a list/collection of integers instead of a single int)
> The first three options imply that a "full broadcast" is supported only, so 
> it's less flexible. On the other hand, it's easier to use (especially the 
> first two options are easy as they do not require to implement a custom 
> partitioner).
> The last option would be most flexible and also allow for a "partial 
> broadcast" (aka multi-cast) pattern. It might also be possible to combine two 
> options, or maye even a totally different idea.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14415) ThreadCache is getting slower with every additional state store

2022-12-06 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14415.

Resolution: Fixed

> ThreadCache is getting slower with every additional state store
> ---
>
> Key: KAFKA-14415
> URL: https://issues.apache.org/jira/browse/KAFKA-14415
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.4.0
>
>
> There are a few lines in `ThreadCache` that I think should be optimized. 
> `sizeBytes` is called at least once, and potentially many times in every 
> `put` and is linear in the number of caches (= number of state stores, so 
> typically proportional to number of tasks). That means, with every additional 
> task, every put gets a little slower.Compare the throughput of TIME_ROCKS on 
> trunk (green graph):
> [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-3-4-0-51b7eb7937-jenkins-20221113214104-streamsbench/]
> This is the throughput of TIME_ROCKS is 20% higher when a constant time 
> `sizeBytes` implementation is used:
> [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASCOMPARE-lucas-20221122140846-streamsbench/]
> The same seems to apply for the MEM backend (initial throughput >8000 instead 
> of 6000), however, I cannot run the same benchmark here because the memory is 
> filled too quickly.
> [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASSTATE-lucas-20221121231632-streamsbench/]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14444) Simplify user experience of customizing partitioning strategy in Streams

2022-12-05 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-1:
--

 Summary: Simplify user experience of customizing partitioning 
strategy in Streams
 Key: KAFKA-1
 URL: https://issues.apache.org/jira/browse/KAFKA-1
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman


The current process of plugging a custom partitioning scheme across a Streams 
application is fairly intensive and extremely error prone. While defining their 
topology users must pay close attention to where an operator/node may be 
connected to or creating a topic that will be produced to, or else print out 
their topology description and try to locate all sink nodes in this way. If 
they miss passing in their custom partitioner to one or more such locations in 
the topology, everything downstream will be affected by the 
inconsistent/unintended partitioning scheme.

It can also be easy for users to miss this process entirely and try to 
customize the partitioning scheme via the producer config. This does not work, 
and unfortunately results in a runtime exception that's difficult for users to 
interpret. Ideally we would provide a similar config for Streams where users 
could define a default implementation of the StreamPartitioner interface.

...unfortunately, this is not so straightforward. Unlike the case of the 
Producer config, where there is a clearly defined key and value type, there's 
no guarantee each sink node requiring the custom partitioner deals with the 
same key/value type as the others.

We could utilize the default.key/value configs for this, and only require users 
to plug in their partitioner where the key/value types differ from the default, 
but this would likely limit the usefulness of a default partitioner 
significantly. We could push this to the user to write a generic implementation 
class with type checking and handling, but this would be pretty awkward and 
error prone as well.

Either way this will take some thought, which is why the idea was pulled from 
the proposal in KIP-878 and left for a follow-up KIP



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14406) Double iteration of records in batches to be restored

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14406.

Resolution: Fixed

> Double iteration of records in batches to be restored
> -
>
> Key: KAFKA-14406
> URL: https://issues.apache.org/jira/browse/KAFKA-14406
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.4.0
>
>
> While restoring a batch of records, {{RocksDBStore}} was iterating the 
> {{{}ConsumerRecord{}}}s, building a list of {{{}KeyValue{}}}s, and then 
> iterating _that_ list of {{{}KeyValue{}}}s to add them to the RocksDB batch.
> Simply adding the key and value directly to the RocksDB batch prevents this 
> unnecessary second iteration, and the creation of itermediate {{KeyValue}} 
> objects, improving the performance of state restoration, and reducing 
> unnecessary object allocation.
> (thanks to Nick Telford for finding this)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14406) Double iteration of records in batches to be restored

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14406:
--

 Summary: Double iteration of records in batches to be restored
 Key: KAFKA-14406
 URL: https://issues.apache.org/jira/browse/KAFKA-14406
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.4.0


While restoring a batch of records, {{RocksDBStore}} was iterating the 
{{{}ConsumerRecord{}}}s, building a list of {{{}KeyValue{}}}s, and then 
iterating _that_ list of {{{}KeyValue{}}}s to add them to the RocksDB batch.

Simply adding the key and value directly to the RocksDB batch prevents this 
unnecessary second iteration, and the creation of itermediate {{KeyValue}} 
objects, improving the performance of state restoration, and reducing 
unnecessary object allocation.

(thanks to Nick Telford for finding this)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14405) Log a warning when users attempt to set a config controlled by Streams

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14405:
--

 Summary: Log a warning when users attempt to set a config 
controlled by Streams
 Key: KAFKA-14405
 URL: https://issues.apache.org/jira/browse/KAFKA-14405
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


Related to https://issues.apache.org/jira/browse/KAFKA-14404

It's too easy for users to try overriding one of the client configs that 
Streams hardcodes, and since we just silently ignore it there's no good way for 
them to tell their config is not being used. Sometimes this may be harmless but 
in cases like the Producer's partitioner, there could be important application 
logic that's never being invoked.

When processing user configs in StreamsConfig, we should check for all these 
configs and log a warning when any of them have been set



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14404) Improve dWarn that the ProducerConfig partitioner.class cannot be used in Streams

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14404:
--

 Summary: Improve dWarn that the ProducerConfig partitioner.class 
cannot be used in Streams
 Key: KAFKA-14404
 URL: https://issues.apache.org/jira/browse/KAFKA-14404
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


There are a handful of client configs that can't be set by Streams users for 
various reasons, such as the group id, but we seem to have missed a few of them 
in the documentation 
[here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
 the partitioner assignor (Consumer) and partitioner (Producer).

This section of the docs also just needs to be cleaned up in general as there 
is overlap between the [Default 
Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
 and [Parameters controlled by Kafka 
Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
 sections, and the table of contents is messed up presumably due to an issue 
with the section headers.

We should separate these with one section covering (only) configs where Streams 
sets a different default but this can still be overridden by the user, and the 
other section covering the configs that Streams hardcodes and users can never 
override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14385) Flaky Test QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

2022-11-11 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14385:
--

 Summary: Flaky Test 
QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable
 Key: KAFKA-14385
 URL: https://issues.apache.org/jira/browse/KAFKA-14385
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


Failed twice on the same build (Java 8 & 11)
h3. Stacktrace

java.lang.AssertionError: KafkaStreams did not transit to RUNNING state within 
15000 milli seconds. Expected:  but: was  at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(StreamsTestUtils.java:134)
 at 
org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(StreamsTestUtils.java:121)
 at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable(QueryableStateIntegrationTest.java:1038)

 

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12836/3/testReport/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldNotMakeStoreAvailableUntilAllStoresAvailable/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14384) Flaky Test SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff

2022-11-11 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14384:
--

 Summary: Flaky Test 
SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff
 Key: KAFKA-14384
 URL: https://issues.apache.org/jira/browse/KAFKA-14384
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


h3. Stacktrace

java.lang.AssertionError: Did not receive all 5 records from topic 
selfjoin-outputSelfJoinUpgradeIntegrationTestshouldUpgradeWithTopologyOptimizationOff
 within 6 ms Expected: is a value equal to or greater than <5> but: <0> was 
less than <5> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueWithTimestampRecordsReceived$2(IntegrationTestUtils.java:763)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:382) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(IntegrationTestUtils.java:759)
 at 
org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.processKeyValueAndVerifyCount(SelfJoinUpgradeIntegrationTest.java:244)
 at 
org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff(SelfJoinUpgradeIntegrationTest.java:155)

 

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12835/4/testReport/org.apache.kafka.streams.integration/SelfJoinUpgradeIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldUpgradeWithTopologyOptimizationOff/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2022-11-11 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12935.

Assignee: Lucas Brutschy

Fixed again via 
https://github.com/apache/kafka/commit/ce5faa222b3f58a74994190e3a6267ac87ee21a8

> Flaky Test 
> RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
> 
>
> Key: KAFKA-12935
> URL: https://issues.apache.org/jira/browse/KAFKA-12935
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: flaky-test
> Attachments: 
> RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore[true].rtf
>
>
> {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14382) StreamThreads can miss rebalance events when processing records during a rebalance

2022-11-10 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14382:
--

 Summary: StreamThreads can miss rebalance events when processing 
records during a rebalance
 Key: KAFKA-14382
 URL: https://issues.apache.org/jira/browse/KAFKA-14382
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


One of the main improvements introduced by the cooperative protocol was the 
ability to continue processing records during a rebalance. In Streams, we take 
advantage of this by polling with a timeout of 0 when a rebalance is/has been 
in progress, so it can return immediately and continue on through the main loop 
to process new records. The main poll loop uses an algorithm based on the 
max.poll.interval.ms to ensure the StreamThread returns to call #poll in time 
to stay in the consumer group.

 

Generally speaking, it should exit the processing loop and invoke poll within a 
few minutes at most based on the poll interval, though typically it will break 
out much sooner once it's used up all the records from the last poll (based on 
the max.poll.records config which Streams sets to 1,000 by default). However, 
if doing heavy processing or setting a higher max.poll.records, the thread may 
continue processing for more than a few seconds. If it had sent out a JoinGroup 
request before going on to process and was waiting for its JoinGroup response, 
then once it does return to invoke #poll it will process this response and send 
out a SyncGroup – but if the processing took too long, this SyncGroup may 
immediately fail with the REBALANCE_IN_PROGRESS error.

 

Essentially, while the thread was processing the group leader will itself be 
processing the JoinGroup subscriptions of all members and generating an 
assignment, then sending this back in its SyncGroup. This may take only a few 
seconds or less, and the group coordinator will not yet have noticed (or care) 
that one of the consumers hasn't sent a SyncGroup – it will just return the 
assigned partitions in the SyncGroup request of the members who have responded 
in time, and "complete" the rebalance in their eyes. But if the assignment 
involved moving any partitions from one consumer to another, then it will need 
to trigger a followup rebalance right away to finish assigning those partitions 
which were revoked in the previous rebalance. This is what causes a new 
rebalance to be kicked off just seconds after the first one began.

 

If the consumer that was stuck processing was among those who needed to revoke 
partitions, this can lead to repeating rebalances – since it fails the 
SyncGroup of the 1st rebalance it never receives the assignment for it and 
never knows to revoke those partitions, meaning it will rejoin for the new 
rebalance still claiming them among its ownedPartitions. When the assignor 
generates the same assignment for the 2nd rebalance, it will again see that 
some partitions need to be revoked and will therefore trigger yet another new 
rebalance after finishing the 2nd. This can go on for as long as the 
StreamThreads are struggling to finish the JoinGroup phase in time due to 
processing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2022-11-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-13891:


Reopening – original fix was reverted, we should instead fix this assignor-side 
by making it smarter about partition ownership across generations. Basically, 
it should take as the previous owner whichever consumer has the highest 
generation and claims it among their owned partitions

 

[~showuon] I probably won't be able to get to this within the next few days so 
if you're interested in picking up this fix go ahead and I'll find time to 
review – otherwise I will try to get to it in time for the 3.4 release

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Priority: Major
> Fix For: 3.3.0, 3.2.4
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14211) Streams log message has partition and offset transposed

2022-11-06 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14211.

Resolution: Fixed

Resolving this since it's apparently fixed by PR (see Bruno's comment) – 
[~cadonna]  can you fill out the "Fix Version" for this?

> Streams log message has partition and offset transposed
> ---
>
> Key: KAFKA-14211
> URL: https://issues.apache.org/jira/browse/KAFKA-14211
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.1
>Reporter: Matt Allwood
>Priority: Minor
>
> The log warning message for out-of-order KTable update has partition and 
> offset the wrong way around.
> For example:
> {noformat}
> [...-StreamThread-1] WARN 
> org.apache.kafka.streams.kstream.internals.KTableSource - Detected 
> out-of-order KTable update for KTABLE-FK-JOIN-OUTPUT-STATE-STORE-000274, 
> old timestamp=[1649245600022] new timestamp=[1642429126882]. 
> topic=[...-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-000269-topic] 
> partition=[2813] offset=[0].{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14123) Delete with null value not supported in Streams PersistantWindowsStore

2022-10-20 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-14123.

Resolution: Not A Problem

> Delete with null value not supported in Streams PersistantWindowsStore
> --
>
> Key: KAFKA-14123
> URL: https://issues.apache.org/jira/browse/KAFKA-14123
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Pawan Sharma
>Priority: Major
>
> Unable to delete an Window entry from Persistant Windows Store by passing 
> null value in the body.
>  
> Put in this class does not check if the value is null and invoke the remove 
> method.
> [https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java]
>  
> Where as the same feature is working in InMemoryWindowsStore, where the null 
> values are treated as delete. line no 126.
> [https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java]
>  
> This behaviour is little in contrast to all other stores including kv stores, 
> where a null value is treated as delete and also complys with the behaviour 
> of compressed Kafka topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14318) KIP-878: Autoscaling for Statically Partitioned Streams

2022-10-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14318:
--

 Summary: KIP-878: Autoscaling for Statically Partitioned Streams
 Key: KAFKA-14318
 URL: https://issues.apache.org/jira/browse/KAFKA-14318
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman
 Fix For: 3.4.0


[KIP-878: Autoscaling for Statically Partitioned 
Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14008) Add docs for new metrics introduced in KIP-846

2022-06-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14008:
--

 Summary: Add docs for new metrics introduced in KIP-846
 Key: KAFKA-14008
 URL: https://issues.apache.org/jira/browse/KAFKA-14008
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.3.0


Need to write docs for 
[KIP-846|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093
 ]
 # Note in upgrade guide
 # Add the following to the [Streams metric 
docs|https://kafka.apache.org/documentation/#kafka_streams_monitoring]:

 ## bytes-consumed-total
 ## records-consumed-total
 ## bytes-produced-total
 ## records-produced-total



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13957) Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores

2022-06-02 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13957:
--

 Summary: Flaky Test 
StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores
 Key: KAFKA-13957
 URL: https://issues.apache.org/jira/browse/KAFKA-13957
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


Failed on a local build so I have the full logs (attached)
{code:java}
java.lang.AssertionError: Unexpected exception thrown while getting the value 
from store.
Expected: is (a string containing "Cannot get state store source-table because 
the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string containing 
"The state store, source-table, may have migrated to another instance" or a 
string containing "Cannot get state store source-table because the stream 
thread is STARTING, not RUNNING")
 but: was "The specified partition 1 for store source-table does not exist."
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.verifyRetrievableException(StoreQueryIntegrationTest.java:539)
at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQuerySpecificActivePartitionStores$5(StoreQueryIntegrationTest.java:241)
at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:557)
at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:183)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:833) {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13945) Add task-level metrics to Streams for bytes/records Produced

2022-05-28 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13945:
--

 Summary: Add task-level metrics to Streams for bytes/records 
Produced
 Key: KAFKA-13945
 URL: https://issues.apache.org/jira/browse/KAFKA-13945
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-03-30 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13152.

Resolution: Fixed

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2022-03-15 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-13542:


Reopening this since it was reverted pending investigation into a benchmark 
regression

> Utilize the new Consumer#enforceRebalance(reason) API in Streams
> 
>
> Key: KAFKA-13542
> URL: https://issues.apache.org/jira/browse/KAFKA-13542
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Hao Li
>Priority: Blocker
> Fix For: 3.2.0
>
>
> KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
> API, which will be passed in to a new field of the JoinGroup protocol. We 
> invoke this API throughout Streams for various reasons, which are very useful 
> for debugging the cause of rebalancing. Passing in the reason to this new API 
> would make it possible to figure out why a Streams client triggered a 
> rebalance from the broker logs, which are often the only logs available when 
> the client logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13713) Tech Debt: keep StreamThread and TopologyMetadata's view of the topology in sync

2022-03-06 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13713:
--

 Summary: Tech Debt: keep StreamThread and TopologyMetadata's view 
of the topology in sync
 Key: KAFKA-13713
 URL: https://issues.apache.org/jira/browse/KAFKA-13713
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


Currently when the topology is modified via an add/remove request, we 
immediately update the TopologyMetadata with the new/removed topology and then 
register listeners for each request so we can complete it once all threads have 
ack'ed the corresponding update, ie upgraded to that minimum topology version.

 For consistency we should consider trying to keep the topology on the minimum 
common version across all (live/active group member) threads. Once a thread 
notices a topology update has been queued, it will update its own view and bump 
it to the latest topology version. We then check if the minimum common topology 
version has increased and then upgrade the official topology as tracked by the 
TopologyMetadata if so.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13712) Make topology addition/removal atomic so we can roll back if request fails

2022-03-06 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13712:
--

 Summary: Make topology addition/removal atomic so we can roll back 
if request fails
 Key: KAFKA-13712
 URL: https://issues.apache.org/jira/browse/KAFKA-13712
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13711) Fix bugs with input topic management to support pattern subscription fully

2022-03-06 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13711:
--

 Summary: Fix bugs with input topic management to support pattern 
subscription fully
 Key: KAFKA-13711
 URL: https://issues.apache.org/jira/browse/KAFKA-13711
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


See https://github.com/apache/kafka/pull/11601



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13690) Flaky test EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]

2022-02-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13690:
--

 Summary: Flaky test 
EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]
 Key: KAFKA-13690
 URL: https://issues.apache.org/jira/browse/KAFKA-13690
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


The _at_least_once_ version of the 
"{*}EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown"{*} test 
is occasionally failing with
h3. Error Message

java.lang.AssertionError: The committed records do not match what expected 
Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), 
KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 
36), KeyValue(0, 45)]> but: was <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 
3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 10), KeyValue(0, 11), 
KeyValue(0, 13), KeyValue(0, 16), KeyValue(0, 20), KeyValue(0, 25), KeyValue(0, 
31), KeyValue(0, 38)]>

 

Seems we are receiving more than the expected records.

...of course, this is an ALOS flavor of the {*}EOS{*}IntegrationTest, so 
perhaps we shouldn't be running this variant at all? Not sure if this explains 
the exact output we receive but it certainly seems suspicious

 

Added at_least_once in [https://github.com/apache/kafka/pull/11283]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13645) Support the TopologyTestDriver with modular topologies

2022-02-03 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13645:
--

 Summary: Support the TopologyTestDriver with modular topologies
 Key: KAFKA-13645
 URL: https://issues.apache.org/jira/browse/KAFKA-13645
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


Currently the TTD accepts only a single Topology. Users can technically just 
use one TTD per Topology, but for a complete simulation of the actual 
KafkaStreams app we'll need to add support for processing multiple modular 
topologies with the TopologyTestDriver



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13644) Support global state stores with modular topologies

2022-02-03 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13644:
--

 Summary: Support global state stores with modular topologies
 Key: KAFKA-13644
 URL: https://issues.apache.org/jira/browse/KAFKA-13644
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13643) Replace "NamedTopology" with "ModularTopology" in the codebase

2022-02-03 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13643:
--

 Summary: Replace "NamedTopology" with "ModularTopology" in the 
codebase
 Key: KAFKA-13643
 URL: https://issues.apache.org/jira/browse/KAFKA-13643
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13590) Rename InternalTopologyBuilder's #topicGroups method to be more descriptive

2022-01-10 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13590:
--

 Summary: Rename InternalTopologyBuilder's #topicGroups method to 
be more descriptive
 Key: KAFKA-13590
 URL: https://issues.apache.org/jira/browse/KAFKA-13590
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


Pretty much what the title says, it can be difficult to figure out what this 
method is actually returning based on the method name.. At least javadocs were 
added in a recent PR, but  ideally you nwouldn't need to visit the method's 
implementation at all to understand its function

 

See 
[https://github.com/apache/kafka/pull/11600/files#r768947553|https://github.com/apache/kafka/pull/11600/files#r768947553]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13543) Consumer may pass stale cluster metadata to the assignor following a subscription update

2021-12-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13543:
--

 Summary: Consumer may pass stale cluster metadata to the assignor 
following a subscription update
 Key: KAFKA-13543
 URL: https://issues.apache.org/jira/browse/KAFKA-13543
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: A. Sophie Blee-Goldman


A consumer only ever tracks metadata corresponding to its subscribed topics, 
which can cause a race condition during a rebalance immediately after a change 
to the consumer's subscription. Particularly, when new topics are added to the 
subscription but a rebalance in kicked off before the consumer's metadata is 
updated with the new topics, it will pass a stale copy of the cluster metadata 
in to the ConsumerPartitionAssignor#assign method, which may not include the 
newly subscribed topics regardless of whether they do or do not exist.

Most apps are likely unaffected by this, including any consumer client apps 
using OOTB assignors, since a new rebalance will be kicked off when the 
metadata is updated and any partitions from the new topics will be assigned at 
that time. But in Kafka Streams, we do a check during each rebalance to ensure 
that any user input topics are created ahead of time. This race condition can 
result in Streams incorrectly identifying user topics as missing and throwing a 
MissingSourceTopicException when a new topology subscribed to new topics is 
added to the application 

We can work around this for now, but it's unfortunate that we can't distinguish 
between true missing source topics and a transient lack of these topics in the 
metadata. There might also be some plain consumer client apps with custom 
assignors that run into this as well, for more advanced users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2021-12-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13542:
--

 Summary: Utilize the new Consumer#enforceRebalance(reason) API in 
Streams
 Key: KAFKA-13542
 URL: https://issues.apache.org/jira/browse/KAFKA-13542
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.2.0


KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
API, which will be passed in to a new field of the JoinGroup protocol. We 
invoke this API throughout Streams for various reasons, which are very useful 
for debugging the cause of rebalancing. Passing in the reason to this new API 
would make it possible to figure out why a Streams client triggered a rebalance 
from the broker logs, which are often the only logs available when the client 
logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13439) Deprecate EAGER rebalancing in Kafka Streams

2021-11-09 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13439:
--

 Summary: Deprecate EAGER rebalancing in Kafka Streams
 Key: KAFKA-13439
 URL: https://issues.apache.org/jira/browse/KAFKA-13439
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.1.0


Cooperative rebalancing has been the default since 2.4, but we have always had 
to keep the logic for eager rebalancing around to allow users a live upgrade 
path. The current upgrade path involves two rolling bounces, the first one to 
upgrade the byte code and set the UPGRADE_FROM config to keep Kafka Streams on 
the old EAGER protocol until everyone has been upgraded, and a second rolling 
bounce to remove the config and start enabling COOPERATIVE

 

We'd like to finally remove the EAGER protocol and tackle some tech debt its 
presence has accrued, but we should first give users a warning that we intend 
to remove this and that it will require a slight change to the upgrade path for 
any users who want to upgrade from 2.3 or below: going through a "bridge" 
version between 2.4 - 3.1 in the first rolling bounce, before upgrading to the 
final version. 

We should also prepare by logging a warning in 3.1 if we see the UPGRADE_FROM 
config set, informing them that they will need to make sure to remove it before 
the EAGER protocol is removed. Then in version 3.2 (or whenever we remove it) 
we still throw an exception and shut down if a user has set the UPGRADE_FROM 
flag to a pre-2.4 version. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13381) Wrap all uncaught exceptions as StreamsException with TaskId field

2021-10-22 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13381.

Resolution: Fixed

> Wrap all uncaught exceptions as StreamsException with TaskId field
> --
>
> Key: KAFKA-13381
> URL: https://issues.apache.org/jira/browse/KAFKA-13381
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
> Fix For: 3.1.0
>
>
> KIP-783: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-783%3A+Add+TaskId+field+to+StreamsException]
>  
> Currently only some exceptions that occur during processing are wrapped as a 
> StreamsException, which complicates the logic required by a user custom 
> StreamsUncaughtExceptionHandler. It would be cleaner to ensure that all 
> exceptions thrown to the user/handler are wrapped (exactly once) as a 
> StreamsException.
> Further, many exceptions can be traced back to a particular task: eg due to a 
> timeout of that task, or thrown during Task#process, or while 
> closing/suspending/etc that task. It can be helpful both to debugging as well 
> as to handling to have that information, so we can add a TaskId field to the 
> StreamsException class to help users identify the source of an exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-10-21 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12994.

Resolution: Fixed

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Andrew patterson
>Priority: Major
>  Labels: kip-633, newbie, newbie++
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13381) Wrap all uncaught exceptions as StreamsException with TaskId field

2021-10-17 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13381:
--

 Summary: Wrap all uncaught exceptions as StreamsException with 
TaskId field
 Key: KAFKA-13381
 URL: https://issues.apache.org/jira/browse/KAFKA-13381
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


Currently only some exceptions that occur during processing are wrapped as a 
StreamsException, which complicates the logic required by a user custom 
StreamsUncaughtExceptionHandler. It would be cleaner to ensure that all 
exceptions thrown to the user/handler are wrapped (exactly once) as a 
StreamsException.

Further, many exceptions can be traced back to a particular task: eg due to a 
timeout of that task, or thrown during Task#process, or while 
closing/suspending/etc that task. It can be helpful both to debugging as well 
as to handling to have that information, so we can add a TaskId field to the 
StreamsException class to help users identify the source of an exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13336) Migrate StreamsBuilder class to interface with factory method on KafkaStreams

2021-09-29 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13336:
--

 Summary: Migrate StreamsBuilder class to interface with factory 
method on KafkaStreams
 Key: KAFKA-13336
 URL: https://issues.apache.org/jira/browse/KAFKA-13336
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


In order to refactor and improve the streams physical plan generation, we'll 
need to clean up the DSL builder API a bit and in particular enforce the 
configs be passed in from the beginning, rather than only when calling #build. 
We can also use this opportunity to improve the disconnect between the builder, 
the resulting Topology, and the Kafka Streams application that ultimately runs 
this topology – at the moment these are all completely uncoupled on the 
surface, so it's easy to think that a StreamsBuilder can be reused to build 
multiple Topology objects, or that a Topology object could be passed in to 
different KafkaStreams. However there is internal state that is shared and 
modified during StreamsBuilder#build and in the KafkaStreams constructor, and 
they are actually very coupled under the hood meaning there must be a 1:1:1 
ratio of builder to topology to KafkaStreams. So we need a new API that
 # Forces users to pass in the configs (Properties) when constructing the 
builder
 # Clarifies the relationship of the builder object to the topology, and to the 
app itself

I think a good API for this might look something like this:
 # Move the StreamsBuilder class to an internal one (technically we would need 
to keep it where it is for now until a full deprecation cycle)
 # Introduce a TopologyBuilder interface to replace the functionality of the 
current StreamsBuilder class, and have StreamsBuilder implement this. All the 
current methods on StreamsBuilder will be moved to the TopologyBuilder 
interfaces
 # Add a factory method on KafkaStreams for users to get instances of the 
TopologyBuilder, and have this accept a Properties. For example

{code:java}
class KafkaStreams {
public void newTopologyBuilder(final Properties props) {
// convert to StreamsConfig to validate configs & check for 
application.id
final StreamsConfig config = new StreamsConfig(props); 
return new StreamsBuilder(config);
}
}{code}
This should satisfy both of the requirements, and imo provides a cleaner API 
anyways. Getting the builder through a factory method on the KafkaStreams 
object should make it clear that this builder is tied to that particular 
KafkaStreams instance. And we can enforce that it isn't reused for a different 
application by parsing the Properties passed in to 
KafkaStreams#newTopologyBuilder, specifically the application.id



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13333) Optimize condition for triggering rebalance after wiping out corrupted task

2021-09-28 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-1:
--

 Summary: Optimize condition for triggering rebalance after wiping 
out corrupted task
 Key: KAFKA-1
 URL: https://issues.apache.org/jira/browse/KAFKA-1
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


Just filing a ticket to list some thoughts I had on optimizing 
https://issues.apache.org/jira/browse/KAFKA-12486. 

The idea here is to trigger a rebalance upon detecting corruption of some task. 
This task may have had a large amount of state that had to be wiped out under 
eos, so we might be able to avoid a long downtime due to restoration if we can 
utilize the HA TaskAssignor to temporarily move that active task to another 
node that has some state for it already (eg had a standby task for it).

Right now, we trigger that rebalance under the condition that (a) eos is 
enabled, and (b) at least one of the corrupted tasks was an active task. This 
is a pretty safe bet, but it's worth jotting down some potential optimizations 
of this condition so we can trim down the occurrences of unnecessary rebalances 
that wouldn't have helped. For example:

1) Don't kick off a rebalance if the corrupted task is in CREATED or RESTORING, 
and is not within the acceptable.recovery.lag from the end of the changelog. If 
the task wasn't caught up on this host but assigned to it anyway, that 
indicates there wasn't any other host with enough state for this task and 
therefore no one to temporarily take it over

2) Only trigger a rebalance if standbys are configured, and/or parse the 
standby host info to verify whether this task has a standby copy on another 
live client. It's still possible to have a copy of this task's state on another 
host even without standbys, but the odds are greatly reduced.

3) If we want to get really fancy (and I'm not quite sure we do), we could have 
the assignor report not just the names but also the lag of each standby task on 
another host, and then trigger the rebalance depending on whether this task has 
a hot standby within the acceptable.recovery.lag



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12486) Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task

2021-09-28 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12486.

Resolution: Fixed

> Utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted task
> 
>
> Key: KAFKA-12486
> URL: https://issues.apache.org/jira/browse/KAFKA-12486
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
> Fix For: 3.1.0
>
>
> In KIP-441, we added the HighAvailabilityTaskAssignor to address certain 
> common scenarios which tend to lead to heavy downtime for tasks, such as 
> scaling out. The new assignor will always place an active task on a client 
> which has a "caught-up" copy of that tasks' state, if any exists, while the 
> intended recipient will instead get a standby task to warm up the state in 
> the background. This way we keep tasks live as much as possible, and avoid 
> the long downtime imposed by state restoration on active tasks.
> We can actually expand on this to reduce downtime due to restoring state: 
> specifically, we may throw a TaskCorruptedException on an active task which 
> leads to wiping out the state stores of that task and restoring from scratch. 
> There are a few cases where this may be thrown:
>  # No checkpoint found with EOS
>  # TimeoutException when processing a StreamTask
>  # TimeoutException when committing offsets under eos
>  # RetriableException in RecordCollectorImpl
> (There is also the case of OffsetOutOfRangeException, but that is excluded 
> here since it only applies to standby tasks).
> We should consider triggering a rebalance when we hit TaskCorruptedException 
> on an active task, after we've wiped out the corrupted state stores. This 
> will allow the assignor to temporarily redirect this task to another client 
> who can resume work on the task while the original owner works on restoring 
> the state from scratch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13332) New pattern-matched topic with more partitions than existing matched topics can crash Kafka Streams

2021-09-28 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13332:
--

 Summary: New pattern-matched topic with more partitions than 
existing matched topics can crash Kafka Streams
 Key: KAFKA-13332
 URL: https://issues.apache.org/jira/browse/KAFKA-13332
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


The partition count resolution logic in Streams is used to determine the number 
of partitions for any repartition topics that don't already exist. This is done 
by parsing the topology to find the number of partitions of all upstream 
topics, and taking the max. For Pattern-subscribed subtopologies, this means 
you need to ensure that at least one topic matching this pattern is created 
prior to starting up the app. That topic, or topics, will determine the number 
of partitions for any downstream repartitions.

The problem is that repartition topics are created once, the first time the app 
is started up. After that, during each rebalance Streams will validate all 
repartition topics including checking for their existence, and verifying they 
have the correct number of partitions. This check will fail if a new topic is 
created after the first initialization, which matches the pattern but has more 
partitions than any of the existing topics.

This means that unfortunately, you can't create a new input topic that matches 
the pattern your app is subscribed to unless it has equal or fewer partitions 
than the existing matching topics. If you do, you would need to stop all 
instances and delete the existing repartition topics before creating this new 
topic



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2021-09-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13295:
--

 Summary: Long restoration times for new tasks can lead to 
transaction timeouts
 Key: KAFKA-13295
 URL: https://issues.apache.org/jira/browse/KAFKA-13295
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.1.0


In some EOS applications with relatively long restoration times we've noticed a 
series of ProducerFencedExceptions occurring during/immediately after 
restoration. The broker logs were able to confirm these were due to 
transactions timing out.

In Streams, it turns out we automatically begin a new txn when calling {{send}} 
(if there isn’t already one in flight). A {{send}} occurs often outside a 
commit during active processing (eg writing to the changelog), leaving the txn 
open until the next commit. And if a StreamThread has been actively processing 
when a rebalance results in a new stateful task without revoking any existing 
tasks, the thread won’t actually commit this open txn before it goes back into 
the restoration phase while it builds up state for the new task. So the 
in-flight transaction is left open during restoration, during which the 
StreamThread only consumes from the changelog without committing, leaving it 
vulnerable to timing out when restoration times exceed the configured 
transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13283) Migrate experimental feature to public API

2021-09-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13283:
--

 Summary: Migrate experimental feature to public API
 Key: KAFKA-13283
 URL: https://issues.apache.org/jira/browse/KAFKA-13283
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13282) Draft final NamedTopology API and publish a KIP

2021-09-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13282:
--

 Summary: Draft final NamedTopology API and publish a KIP
 Key: KAFKA-13282
 URL: https://issues.apache.org/jira/browse/KAFKA-13282
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


The pre-KIP experimental phase has left quite a few open questions around the 
API of this new feature, we need to hash that that out and then write it up 
into a KIP before introducing this in the public interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13281) Support upgrades with dynamic addition/removal of disjoint "named" topologies

2021-09-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13281:
--

 Summary: Support upgrades with dynamic addition/removal of 
disjoint "named" topologies
 Key: KAFKA-13281
 URL: https://issues.apache.org/jira/browse/KAFKA-13281
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-08-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13128.

Resolution: Fixed

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.8.1
>Reporter: A. Sophie Blee-Goldman
>Assignee: Walker Carlson
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12963) Improve error message for Class cast exception

2021-08-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12963.

Fix Version/s: 3.1.0
   Resolution: Fixed

> Improve error message for Class cast exception
> --
>
> Key: KAFKA-12963
> URL: https://issues.apache.org/jira/browse/KAFKA-12963
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Rasmus Helbig Hansen
>Assignee: Andrew Lapidas
>Priority: Minor
> Fix For: 3.1.0
>
>
> After a topology change and starting the application again, we got this type 
> of error message:
>  [g9z-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
> [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
> error:
>  org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> invoking Processor. Do the Processor's input types match the deserialized 
> types? Check the Serde setup and change the default Serdes in StreamConfig or 
> provide correct Serdes via method parameters. Make sure the Processor can 
> accept the deserialized input of type key: org.acme.SomeKey, and value: 
> org.acme.SomeValue.
>  Note that although incorrect Serdes are a common cause of error, the cast 
> exception might have another cause (in user code, for example). For example, 
> if a processor wires in a store, but casts the generics incorrectly, a class 
> cast exception could be raised during processing, but the cause would not be 
> wrong Serdes.
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
>  Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be 
> cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue 
> are in unnamed module of loader 'app')
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  ... 20 more
>  

[jira] [Resolved] (KAFKA-8734) Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface

2021-08-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-8734.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

> Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface
> --
>
> Key: KAFKA-8734
> URL: https://issues.apache.org/jira/browse/KAFKA-8734
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In 2.4 we deprecated the consumer.internal.PartitionAssignor interface and 
> migrated all assignors to the [new public consumer.ConsumerPartitionAssignor 
> interface|[https://github.com/apache/kafka/pull/7108|https://github.com/apache/kafka/pull/7108/files]].
>  Although internal, we provided an [adapter 
> |[https://github.com/apache/kafka/pull/7110]]for those who may have 
> implemented a custom PartitionAssignor to avoid breaking changes. These 
> should be removed in the next major release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2021-08-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13217:
--

 Summary: Reconsider skipping the LeaveGroup on close() or add an 
overload that does so
 Key: KAFKA-13217
 URL: https://issues.apache.org/jira/browse/KAFKA-13217
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


In Kafka Streams, when an instance is shut down via the close() API, we 
intentionally skip sending a LeaveGroup request. This is because often the 
shutdown is not due to a scaling down event but instead some transient closure, 
such as during a rolling bounce. In cases where the instance is expected to 
start up again shortly after, we originally wanted to avoid that member's tasks 
from being redistributed across the remaining group members since this would 
disturb the stable assignment and could cause unnecessary state migration and 
restoration. We also hoped
to limit the disruption to just a single rebalance, rather than forcing the 
group to rebalance once when the member shuts down and then again when it comes 
back up. So it's really an optimization  for the case in which the shutdown is 
temporary.
 
That said, many of those optimizations are no longer necessary or at least much 
less useful given recent features and improvements. For example rebalances are 
now lightweight so skipping the 2nd rebalance is not as worth optimizing for, 
and the new assignor will take into account the actual underlying state for 
each task/partition assignment, rather than just the previous assignment, so 
the assignment should be considerably more stable across bounces and rolling 
restarts. 
 
Given that, it might be time to reconsider this optimization. Alternatively, we 
could introduce another form of the close() API that forces the member to leave 
the group, to be used in event of actual scale down rather than a transient 
bounce.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-08-06 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-13128:

  Assignee: (was: A. Sophie Blee-Goldman)

Failed again for a different reason – just flaky, seems we need to wait for the 
thread to fully start up

 

{{java.lang.AssertionError: Unexpected exception thrown while getting the value 
from store.
Expected: is (a string containing "Cannot get state store source-table because 
the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string containing 
"The state store, source-table, may have migrated to another instance")
 but: was "Cannot get state store source-table because the stream thread is 
STARTING, not RUNNING"}}

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.8.1
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0, 2.8.1
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13170) Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown

2021-08-05 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13170:
--

 Summary: Flaky Test 
InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown
 Key: KAFKA-13170
 URL: https://issues.apache.org/jira/browse/KAFKA-13170
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: A. Sophie Blee-Goldman


[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11176/2/testReport/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/Build___JDK_8_and_Scala_2_12___shouldRetryDeleteTopicWhenTopicUnknown_2/]
{code:java}
Stacktracejava.lang.AssertionError: unexpected exception type thrown; 
expected: but 
was:
  at org.junit.Assert.assertThrows(Assert.java:1020)
  at org.junit.Assert.assertThrows(Assert.java:981)
  at 
org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenRetriableException(InternalTopicManagerTest.java:526)
  at 
org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown(InternalTopicManagerTest.java:497)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13169) Flaky Test QueryableStateIntegrationTest.shouldBeAbleToQueryStateWithNonZeroSizedCache

2021-08-05 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13169:
--

 Summary: Flaky Test 
QueryableStateIntegrationTest.shouldBeAbleToQueryStateWithNonZeroSizedCache
 Key: KAFKA-13169
 URL: https://issues.apache.org/jira/browse/KAFKA-13169
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: A. Sophie Blee-Goldman


[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11176/2/testReport/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/Build___JDK_8_and_Scala_2_12___shouldRetryDeleteTopicWhenTopicUnknown/]
{code:java}
Stacktrace
java.lang.AssertionError: unexpected exception type thrown; 
expected: but 
was:
  at org.junit.Assert.assertThrows(Assert.java:1020)
  at org.junit.Assert.assertThrows(Assert.java:981)
  at 
org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenRetriableException(InternalTopicManagerTest.java:526)
  at 
org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown(InternalTopicManagerTest.java:497)
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10246) AbstractProcessorContext topic() throws NullPointerException when modifying a state store within the DSL from a punctuator

2021-07-30 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-10246.

Resolution: Fixed

> AbstractProcessorContext topic() throws NullPointerException when modifying a 
> state store within the DSL from a punctuator
> --
>
> Key: KAFKA-10246
> URL: https://issues.apache.org/jira/browse/KAFKA-10246
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
> Environment: linux, windows, java 11
>Reporter: Peter Pringle
>Priority: Major
>
> NullPointerException seen when a KTable statestore is being modified by a 
> punctuated method which is added to a topology via the DSL processor/ktable 
> valueTransfomer methods.
> It seems valid for AbstractProcessorContext.topic() to return null; however 
> the check below returns a NullPointerException before a null can be returned.
> {quote}if (topic.equals(NONEXIST_TOPIC)) {
> {quote}
> Made a local fix to reverse the ordering of the check (i.e. avoid the null) 
> and this appears to fix the issue and sends the change to the state stores 
> changelog topic.
> {quote}if (NONEXIST_TOPIC.equals(topic)) {
> {quote}
> Stacktrace below
> {{2020-07-02 07:29:46,829 
> [ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR 
> [o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5}}
>  {{51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the 
> following error during processing:}}
>  {{java.lang.NullPointerException: null}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}}
>  \{{ at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}}
>  \{{ at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
>  \{{ at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
>  \{{ at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
>  \{{ at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
>  \{{ at 
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}}
>  \{{ at 
> org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
>  \{{ at 
> org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}}
>  \{{ at 
> org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
>  \{{ at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
>  \{{ at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}}
>  \{{ at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}}
>  \{{ at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}}

[jira] [Resolved] (KAFKA-13150) How is Kafkastream configured to consume data from a specified offset ?

2021-07-29 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13150.

Resolution: Invalid

> How is Kafkastream configured to consume data from a specified offset ?
> ---
>
> Key: KAFKA-13150
> URL: https://issues.apache.org/jira/browse/KAFKA-13150
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: wangjh
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13021) Improve Javadocs for API Changes and address followup from KIP-633

2021-07-23 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13021.

Resolution: Fixed

> Improve Javadocs for API Changes and address followup from KIP-633
> --
>
> Key: KAFKA-13021
> URL: https://issues.apache.org/jira/browse/KAFKA-13021
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
> Fix For: 3.0.0
>
>
> There are Javadoc changes from the following PR that needs to be completed 
> prior to the 3.0 release. This Jira item is to track that work
> [https://github.com/apache/kafka/pull/10926]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13128:
--

 Summary: Flaky Test 
StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
 Key: KAFKA-13128
 URL: https://issues.apache.org/jira/browse/KAFKA-13128
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.1.0
Reporter: A. Sophie Blee-Goldman


h3. Stacktrace

java.lang.AssertionError: Expected: is not null but: was null 
  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
  at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
  at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
  at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)

 

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13126) Overflow in joinGroupTimeoutMs when max.poll.interval.ms is MAX_VALUE leads to missing rebalances

2021-07-22 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13126:
--

 Summary: Overflow in joinGroupTimeoutMs when max.poll.interval.ms 
is MAX_VALUE leads to missing rebalances
 Key: KAFKA-13126
 URL: https://issues.apache.org/jira/browse/KAFKA-13126
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman
 Fix For: 3.1.0


In older versions of Kafka Streams, the {{max.poll.interval.ms}} config was 
overridden by default to {{Integer.MAX_VALUE}}. Even after we removed this 
override, users of both the plain consumer client and kafka streams still set 
the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an 
overflow when computing the {{joinGroupTimeoutMs}} and results in it being set 
to the {{request.timeout.ms}} instead, which is much lower.

This can easily make consumers drop out of the group, since they must rejoin 
now within 30s (by default) yet have no obligation to almost ever call poll() 
given the high {{max.poll.interval.ms}}. We just need to check for overflow and 
fix it to {{Integer.MAX_VALUE}} when it occurs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13121) Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()

2021-07-21 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13121:
--

 Summary: Flaky Test 
TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
 Key: KAFKA-13121
 URL: https://issues.apache.org/jira/browse/KAFKA-13121
 Project: Kafka
  Issue Type: Bug
  Components: log
Reporter: A. Sophie Blee-Goldman


h4. Stack Trace
{code:java}
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No 
resource found for partition: TopicIdPartition{topicId=2B9rDu44TE6c8pLG8A0RAg, 
topicPartition=new-leader-0}
at 
org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:112)
 
at 
org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.listRemoteLogSegments(RemotePartitionMetadataStore.java:98)
at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.listRemoteLogSegments(TopicBasedRemoteLogMetadataManager.java:212)
at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates(TopicBasedRemoteLogMetadataManagerTest.java:99){code}
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10921/11/testReport/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13096) QueryableStoreProvider is not updated when threads are added/removed/replaced rendering IQ impossible

2021-07-15 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13096:
--

 Summary: QueryableStoreProvider is not updated when threads are 
added/removed/replaced rendering IQ impossible
 Key: KAFKA-13096
 URL: https://issues.apache.org/jira/browse/KAFKA-13096
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.0.0, 2.8.1


The QueryableStoreProviders class is used to route queries to the correct state 
store on the owning StreamThread, making it a critical piece of IQ. It gets 
instantiated when you create a new KafkaStreams, and is passed in a list of 
StreamThreadStateStoreProviders which it then copies and stores. Because it 
only stores a copy it only ever contains a provider for the StreamThreads that 
were created during the app's startup, and unfortunately is never updated 
during an add/remove/replace thread event. 

This means that IQ can’t get a handle on any stores that belong to a thread 
that wasn’t in the original set. If the app is starting up new threads through 
the #addStreamThread API or following a REPLACE_THREAD event, none of the data 
in any of the stores owned by that new thread will be accessible by IQ. If a 
user is removing threads through #removeStreamThread, or threads die and get 
replaced, you can fall into an endless loop of {{InvalidStateStoreException}} 
from doing a lookup into stores that have been closed since the thread was 
removed/died.

If over time all of the original threads are removed or replaced, then IQ won’t 
work at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12896) Group rebalance loop caused by repeated group leader JoinGroups

2021-07-15 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12896.

Resolution: Fixed

> Group rebalance loop caused by repeated group leader JoinGroups
> ---
>
> Key: KAFKA-12896
> URL: https://issues.apache.org/jira/browse/KAFKA-12896
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.6.0
>Reporter: Lucas Bradstreet
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 3.0.0
>
>
> We encountered a strange case of a rebalance loop with the 
> "cooperative-sticky" assignor. The logs show the following for several hours:
>  
> {{Apr 7, 2021 @ 03:58:36.040  [GroupCoordinator 7]: Stabilized group mygroup 
> generation 19830137 (__consumer_offsets-7)}}
> {{Apr 7, 2021 @ 03:58:35.992  [GroupCoordinator 7]: Preparing to rebalance 
> group mygroup in state PreparingRebalance with old generation 19830136 
> (__consumer_offsets-7) (reason: Updating metadata for member 
> mygroup-1-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}}
> {{Apr 7, 2021 @ 03:58:35.988  [GroupCoordinator 7]: Stabilized group mygroup 
> generation 19830136 (__consumer_offsets-7)}}
> {{Apr 7, 2021 @ 03:58:35.972  [GroupCoordinator 7]: Preparing to rebalance 
> group mygroup in state PreparingRebalance with old generation 19830135 
> (__consumer_offsets-7) (reason: Updating metadata for member mygroup during 
> CompletingRebalance)}}
> {{Apr 7, 2021 @ 03:58:35.965  [GroupCoordinator 7]: Stabilized group mygroup 
> generation 19830135 (__consumer_offsets-7)}}
> {{Apr 7, 2021 @ 03:58:35.953  [GroupCoordinator 7]: Preparing to rebalance 
> group mygroup in state PreparingRebalance with old generation 19830134 
> (__consumer_offsets-7) (reason: Updating metadata for member 
> mygroup-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}}
> {{Apr 7, 2021 @ 03:58:35.941  [GroupCoordinator 7]: Stabilized group mygroup 
> generation 19830134 (__consumer_offsets-7)}}
> {{Apr 7, 2021 @ 03:58:35.926  [GroupCoordinator 7]: Preparing to rebalance 
> group mygroup in state PreparingRebalance with old generation 19830133 
> (__consumer_offsets-7) (reason: Updating metadata for member mygroup during 
> CompletingRebalance)}}
> Every single time, it was the same member that triggered the JoinGroup and it 
> was always the leader of the group.{{}}
> The leader has the privilege of being able to trigger a rebalance by sending 
> `JoinGroup` even if its subscription metadata has not changed. But why would 
> it do so?
> It is possible that this is due to the same issue or a similar bug to 
> https://issues.apache.org/jira/browse/KAFKA-12890.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13081:
--

 Summary: Port sticky assignor fixes (KAFKA-12984) back to 2.8
 Key: KAFKA-13081
 URL: https://issues.apache.org/jira/browse/KAFKA-13081
 Project: Kafka
  Issue Type: Bug
Reporter: A. Sophie Blee-Goldman
 Fix For: 2.8.1


We should make sure that fix #1 and #2 of 
[#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
sticky assignor, since it's pretty much impossible to smoothly cherrypick that 
commit from 3.0 to 2.8 due to all the recent improvements and refactoring in 
the AbstractStickyAssignor. Either we can just extract and apply those two 
fixes to 2.8 directly, or go back and port all the commits that made this 
cherrypick difficult over to 2.8 as well. If we do so then cherrypicking the 
original commit should be easy



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13075) Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest

2021-07-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-13075.

Fix Version/s: 3.1.0
   Resolution: Fixed

> Consolidate RocksDBStoreTest and RocksDBKeyValueStoreTest
> -
>
> Key: KAFKA-13075
> URL: https://issues.apache.org/jira/browse/KAFKA-13075
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Chun-Hao Tang
>Priority: Major
>  Labels: newbie, newbie++
> Fix For: 3.1.0
>
>
> Looks like we have two different test classes covering pretty much the same 
> thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original 
> test class for RocksDBStore, but someone later added RocksDBStoreTest, most 
> likely because they didn't notice the RocksDBKeyValueStoreTest which didn't 
> follow the usual naming scheme for test classes. 
> We should consolidate these two into a single file, ideally retaining the 
> RocksDBStoreTest name since that conforms to the test naming pattern used 
> throughout Streams (and so this same thing doesn't happen again). It should 
> also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest 
> currently does so we continue to get the benefit of all the tests in there as 
> well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13075) Consolidate RocksDBStore and RocksDBKeyValueStoreTest

2021-07-12 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13075:
--

 Summary: Consolidate RocksDBStore and RocksDBKeyValueStoreTest
 Key: KAFKA-13075
 URL: https://issues.apache.org/jira/browse/KAFKA-13075
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


Looks like we have two different test classes covering pretty much the same 
thing: RocksDBStore. It seems like RocksDBKeyValueStoreTest was the original 
test class for RocksDBStore, but someone later added RocksDBStoreTest, most 
likely because they didn't notice the RocksDBKeyValueStoreTest which didn't 
follow the usual naming scheme for test classes. 

We should consolidate these two into a single file, ideally retaining the 
RocksDBStoreTest name since that conforms to the test naming pattern used 
throughout Streams (and so this same thing doesn't happen again). It should 
also extend AbstractKeyValueStoreTest like the RocksDBKeyValueStoreTest 
currently does so we continue to get the benefit of all the tests in there as 
well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12993) Formatting of Streams 'Memory Management' docs is messed up

2021-06-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12993:
--

 Summary: Formatting of Streams 'Memory Management' docs is messed 
up 
 Key: KAFKA-12993
 URL: https://issues.apache.org/jira/browse/KAFKA-12993
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.0.0, 2.8.1


The formatting of this page is all messed up, starting in the RocksDB section. 
It looks like there's a missing closing tag after the example 
BoundedMemoryRocksDBConfig class



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-06-22 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12984:
--

 Summary: Cooperative sticky assignor can get stuck with invalid 
SubscriptionState input metadata
 Key: KAFKA-12984
 URL: https://issues.apache.org/jira/browse/KAFKA-12984
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.0.0, 2.8.1


Some users have reported seeing their consumer group become stuck in the 
CompletingRebalance phase when using the cooperative-sticky assignor. Based on 
the request metadata we were able to deduce that multiple consumers were 
reporting the same partition(s) in their "ownedPartitions" field of the 
consumer protocol. Since this is an invalid state, the input causes the 
cooperative-sticky assignor to detect that something is wrong and throw an 
IllegalStateException. If the consumer application is set up to simply retry, 
this will cause the group to appear to hang in the rebalance state.

The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
SubscriptionState, which was assumed to always be up to date. However there may 
be cases where the consumer has dropped out of the group but fails to clear the 
SubscriptionState, allowing it to report some partitions as owned that have 
since been reassigned to another member.

We should (a) fix the sticky assignment algorithm to resolve cases of improper 
input conditions by invalidating the "ownedPartitions" in cases of double 
ownership, and (b) shore up the ConsumerCoordinator logic to better handle 
rejoining the group and keeping its internal state consistent. See KAFKA-12983 
for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12983) onJoinPrepare is not always invoked before joining the group

2021-06-22 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12983:
--

 Summary: onJoinPrepare is not always invoked before joining the 
group
 Key: KAFKA-12983
 URL: https://issues.apache.org/jira/browse/KAFKA-12983
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.0.0, 2.8.1


As the title suggests, the #onJoinPrepare callback is not always invoked before 
a member (re)joins the group, but only once when it first enters the rebalance. 
This means that any updates or events that occur during the join phase can be 
lost in the internal state: for example, clearing the SubscriptionState (and 
thus the "ownedPartitions" that are used for cooperative rebalancing) after 
losing its memberId during a rebalance.

We should reset the `needsJoinPrepare` flag inside the resetStateAndRejoin() 
method



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12936) In-memory stores are always restored from scratch after dropping out of the group

2021-06-10 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12936:
--

 Summary: In-memory stores are always restored from scratch after 
dropping out of the group
 Key: KAFKA-12936
 URL: https://issues.apache.org/jira/browse/KAFKA-12936
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


Whenever an in-memory store is closed, the actual store contents are garbage 
collected and the state will need to be restored from scratch if the task is 
reassigned and re-initialized. We introduced the recycling feature to prevent 
this from occurring when a task is transitioned from standby to active (or vice 
versa), but it's still possible for the in-memory state to be unnecessarily 
wiped out in the case the member has dropped out of the group. In this case, 
the onPartitionsLost callback is invoked, which will close all active tasks as 
dirty before the member rejoins the group. This means that all these tasks will 
need to be restored from scratch if they are reassigned back to this consumer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12920) Consumer's cooperative sticky assignor need to clear generation / assignment data upon `onPartitionsLost`

2021-06-09 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12920.

Resolution: Not A Bug

> Consumer's cooperative sticky assignor need to clear generation / assignment 
> data upon `onPartitionsLost`
> -
>
> Key: KAFKA-12920
> URL: https://issues.apache.org/jira/browse/KAFKA-12920
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: bug, consumer
>
> Consumer's cooperative-sticky assignor does not track the owned partitions 
> inside the assignor --- i.e. when it reset its state in event of 
> ``onPartitionsLost``, the ``memberAssignment`` and ``generation`` inside the 
> assignor would not be cleared. This would cause a member to join with empty 
> generation on the protocol while with non-empty user-data encoding the old 
> assignment still (and hence pass the validation check on broker side during 
> JoinGroup), and eventually cause a single partition to be assigned to 
> multiple consumers within a generation.
> We should let the assignor to also clear its assignment/generation when 
> ``onPartitionsLost`` is triggered in order to avoid this scenario.
> Note that 1) for the regular sticky assignor the generation would still have 
> an older value, and this would cause the previously owned partitions to be 
> discarded during the assignment, and 2) for Streams' sticky assignor, it’s 
> encoding would indeed be cleared along with ``onPartitionsLost``. Hence only 
> Consumer's cooperative-sticky assignor have this issue to solve.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable

2021-05-26 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12851:
--

 Summary: Flaky Test 
RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
 Key: KAFKA-12851
 URL: https://issues.apache.org/jira/browse/KAFKA-12851
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.0.0


Failed twice on a [PR 
build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/]
h3. Stacktrace

org.opentest4j.AssertionFailedError: expected:  but was:  at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12849) Consider migrating TaskMetadata to interface with internal implementation

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12849:
--

 Summary: Consider migrating TaskMetadata to interface with 
internal implementation
 Key: KAFKA-12849
 URL: https://issues.apache.org/jira/browse/KAFKA-12849
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


In KIP-740 we had to go through a deprecation cycle in order to change the 
constructor from the original one which accepted the taskId parameter as a 
string, to the new one which takes a TaskId object directly. We had considered 
just changing the signature directly without deprecation as this was never 
intended to be instantiated by users, rather it just acts as a pass-through 
metadata class. Sort of by definition if there is no reason to ever instantiate 
it, this seems to indicate it may be better suited as a public interface with 
the implementation and constructor as internal APIs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12848) Add some basic benchmarks for Kafka Streams

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12848:
--

 Summary: Add some basic benchmarks for Kafka Streams
 Key: KAFKA-12848
 URL: https://issues.apache.org/jira/browse/KAFKA-12848
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


As the title suggests, we often want to test out improvements or verify that a 
bugfix does not introduce a serious regression. While there are existing 
benchmarks that are run for quality assurance by various contributors, there 
are no publicly available benchmarks for Kafka Streams in AK itself.

It would be great if we had a simple jmh suite (or something) with various 
Streams features which could be run on a one-off basis by developers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-9295:
---

Guess there is still something else going on here yet. At this point I think we 
can mostly rule out fiddling with the configs but I don't have any guesses on 
where to look next. It would be nice if we could get real logs from a run that 
reproduced this, but unfortunately all the actual Streams content is truncated.

[~showuon] maybe you can look into turning the zookeeper and kafka logs down to 
WARN or even ERROR so that we have some hope of viewing the relevant parts of 
the logs? I tried to do that a while back but clearly it didn't work, and I 
didn't have time to revisit it

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12844) KIP-740 follow up: clean up TaskId

2021-05-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12844:
--

 Summary: KIP-740 follow up: clean up TaskId
 Key: KAFKA-12844
 URL: https://issues.apache.org/jira/browse/KAFKA-12844
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 4.0.0


See 
[KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
 – for the TaskId class, we need to remove the following deprecated APIs:
 # The public partition and topicGroupId fields should be "removed", ie made 
private (can also now rename topicGroupId to subtopology to match the getter)
 # The two #readFrom and two #writeTo methods can be removed (they have already 
been converted to internal utility methods we now use instead, so just remove 
them)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata

2021-05-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12843:
--

 Summary: KIP-740 follow up: clean up TaskMetadata
 Key: KAFKA-12843
 URL: https://issues.apache.org/jira/browse/KAFKA-12843
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 4.0.0


See 
[KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
 – for the TaskMetadata class, we need to:
 # Deprecate the TaskMetadata#getTaskId method
 # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() 
API that returns a TaskId instead of a String
 # Remove the deprecated constructor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12779) TaskMetadata should return actual TaskId rather than plain String

2021-05-20 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12779.

Resolution: Fixed

> TaskMetadata should return actual TaskId rather than plain String
> -
>
> Key: KAFKA-12779
> URL: https://issues.apache.org/jira/browse/KAFKA-12779
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Not sure why this was encoded as a String field instead of using the public 
> TaskId class. We should use an actual TaskId object, especially as we may add 
> additional fields that increase the complexity and parsing of the taskId.
> [KIP-740: Use TaskId instead of String for the taskId field in 
> TaskMetadata|https://cwiki.apache.org/confluence/display/KAFKA/KIP-740%3A+Use+TaskId+instead+of+String+for+the+taskId+field+in+TaskMetadata]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-05-19 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-9295.
---
Resolution: Fixed

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12754) TaskMetadata endOffsets does not update when the offsets are read

2021-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12754.

Fix Version/s: 3.0.0
   Resolution: Fixed

> TaskMetadata endOffsets does not update when the offsets are read
> -
>
> Key: KAFKA-12754
> URL: https://issues.apache.org/jira/browse/KAFKA-12754
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> The high water mark in StreamTask is not updated optimally. Also it would be 
> good to have the metadata offsets have a initial value of -1 instead of an 
> empty map that way the set of TopicPartitions won't change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12782) Javadocs search sends you to a non-existent URL

2021-05-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12782:
--

 Summary: Javadocs search sends you to a non-existent URL
 Key: KAFKA-12782
 URL: https://issues.apache.org/jira/browse/KAFKA-12782
 Project: Kafka
  Issue Type: Bug
  Components: docs
Reporter: A. Sophie Blee-Goldman


I was looking up a class using the javadocs search functionality, and clicked 
on the link when TaskId came up, but it sent me to which 
[https://kafka.apache.org/28/javadoc/undefined/org/apache/kafka/streams/processor/TaskId.html]
 does not exist.

I noticed the URL had an odd "undefined" term inserted before the package name, 
so I took that out and was able to find the [correct 
javadocs|https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/processor/TaskId.html].
 So the search seems to be broken due to this "undefined" term that's being 
injected somewhere, for some reason.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8326) Add Serde> support

2021-05-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-8326.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

> Add Serde> support
> --
>
> Key: KAFKA-8326
> URL: https://issues.apache.org/jira/browse/KAFKA-8326
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Daniyar Yeralin
>Assignee: Daniyar Yeralin
>Priority: Minor
>  Labels: kip
> Fix For: 3.0.0
>
>
> _This ticket proposes adding new {color:#4c9aff}ListSerializer{color} and 
> {color:#4c9aff}ListDeserializer{color} classes as well as support for the new 
> classes into the Serdes class. This will allow using List Serde of type_ 
> {color:#4c9aff}_Serde>_{color} _directly from Consumers, 
> Producers and Streams._
> _{color:#4c9aff}Serde>{color} serialization and deserialization 
> will be done through repeatedly calling a serializer/deserializer for each 
> entry provided by passed generic {color:#4c9aff}Inner{color}'s Serde. For 
> example, if you want to create List of Strings serde, then 
> serializer/deserializer of StringSerde will be used to serialize/deserialize 
> each entry in {color:#4c9aff}List{color}._
> I believe there are many use cases where List Serde could be used:
>  * 
> [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows]
>  * 
> [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]
> For instance, aggregate grouped (by key) values together in a list to do 
> other subsequent operations on the collection.
> KIP Link: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >