[jira] [Created] (KAFKA-15129) Clean up all metrics that were forgotten to be closed
hudeqi created KAFKA-15129: -- Summary: Clean up all metrics that were forgotten to be closed Key: KAFKA-15129 URL: https://issues.apache.org/jira/browse/KAFKA-15129 Project: Kafka Issue Type: Improvement Components: controller, core, log Affects Versions: 3.5.0 Reporter: hudeqi Assignee: hudeqi -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15129) Clean up all metrics that were forgotten to be closed
[ https://issues.apache.org/jira/browse/KAFKA-15129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-15129: --- Description: In the current kafka code, there are still many module metrics that are forgotten to be closed when they stop, although some of them have been fixed, such as kafka-14866 and kafka-14868. et. Here I will find all the metrics that are forgotten and closed in the current version, and submit them according to the subtasks in order to fix them. > Clean up all metrics that were forgotten to be closed > - > > Key: KAFKA-15129 > URL: https://issues.apache.org/jira/browse/KAFKA-15129 > Project: Kafka > Issue Type: Improvement > Components: controller, core, log >Affects Versions: 3.5.0 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > > In the current kafka code, there are still many module metrics that are > forgotten to be closed when they stop, although some of them have been fixed, > such as kafka-14866 and kafka-14868. et. > Here I will find all the metrics that are forgotten and closed in the current > version, and submit them according to the subtasks in order to fix them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tinaselenge commented on pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords
tinaselenge commented on PR #13760: URL: https://github.com/apache/kafka/pull/13760#issuecomment-1610960042 Thank you @showuon for the feedback. I think I have addressed the comments. Please let me know there is anything I missed or doesn't look right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15130) Delete remote segments when delete a topic
Lan Ding created KAFKA-15130: Summary: Delete remote segments when delete a topic Key: KAFKA-15130 URL: https://issues.apache.org/jira/browse/KAFKA-15130 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 3.5.0, 3.4.0 Reporter: Lan Ding Assignee: Lan Ding When tired storage is enabled and {{delete.topic.enable=true}} , deleting a topic should also delete the corresponding segments of that topic on the remote system, and cancel the RLMTask for that topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] erikvanoosten commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer
erikvanoosten commented on PR #13914: URL: https://github.com/apache/kafka/pull/13914#issuecomment-1610983180 KIP-944 has been created: https://cwiki.apache.org/confluence/x/chw0Dw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738014#comment-17738014 ] Erik van Oosten commented on KAFKA-14972: - KIP-944 https://cwiki.apache.org/confluence/x/chw0Dw > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > We propose to replace the thread-id check with an access-id that is stored on > a thread-local variable. Existing programs will not be affected. Developers > that work in an async runtime can pick up the access-id and set it on the > thread-local variable in a thread of their choosing. > Every time a callback is invoked a new access-id is generated. When the > callback completes, the previous access-id is restored. > This proposal does not make it impossible to use the client incorrectly. > However, we think it strikes a good balance between making correct usage from > an async runtime possible while making incorrect usage difficult. > Alternatives considered: > # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated KAFKA-14972: Description: KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. It should be possible for a thread to pass on its capability to access the consumer to another thread. See KIP-944 for a proposal and was: KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. We propose to replace the thread-id check with an access-id that is stored on a thread-local variable. Existing programs will not be affected. Developers that work in an async runtime can pick up the access-id and set it on the thread-local variable in a thread of their choosing. Every time a callback is invoked a new access-id is generated. When the callback completes, the previous access-id is restored. This proposal does not make it impossible to use the client incorrectly. However, we think it strikes a good balance between making correct usage from an async runtime possible while making incorrect usage difficult. Alternatives considered: # Configuration that switches off the check completely. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See KIP-944 for a proposal and -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated KAFKA-14972: Description: KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. It should be possible for a thread to pass on its capability to access the consumer to another thread. See [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and [https://github.com/apache/kafka/pull/13914] for an implementation. was: KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. It should be possible for a thread to pass on its capability to access the consumer to another thread. See KIP-944 for a proposal and > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Erik van Oosten >Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15130) Delete remote segments when delete a topic
[ https://issues.apache.org/jira/browse/KAFKA-15130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding updated KAFKA-15130: - Description: When tiered storage is enabled and {{delete.topic.enable=true}} , deleting a topic should also delete the corresponding segments of that topic on the remote system, and cancel the RLMTask for that topic. (was: When tired storage is enabled and {{delete.topic.enable=true}} , deleting a topic should also delete the corresponding segments of that topic on the remote system, and cancel the RLMTask for that topic.) > Delete remote segments when delete a topic > -- > > Key: KAFKA-15130 > URL: https://issues.apache.org/jira/browse/KAFKA-15130 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.4.0, 3.5.0 >Reporter: Lan Ding >Assignee: Lan Ding >Priority: Major > > When tiered storage is enabled and {{delete.topic.enable=true}} , deleting a > topic should also delete the corresponding segments of that topic on the > remote system, and cancel the RLMTask for that topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15131) Improve RemoteStorageManager exception handling
Jorge Esteban Quilcate Otoya created KAFKA-15131: Summary: Improve RemoteStorageManager exception handling Key: KAFKA-15131 URL: https://issues.apache.org/jira/browse/KAFKA-15131 Project: Kafka Issue Type: Improvement Components: core Reporter: Jorge Esteban Quilcate Otoya Assignee: Jorge Esteban Quilcate Otoya As discussed here[1], RemoteStorageManager javadocs requires clarification regarding error handling: * Remove ambiguity on `RemoteResourceNotFoundException` description * Describe when `RemoteResourceNotFoundException` can/should be thrown * Describe expectations for idempotent operations when copying/deleting [1] https://issues.apache.org/jira/browse/KAFKA-7739?focusedCommentId=17720936&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17720936 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeqo opened a new pull request, #13923: KAFKA-15131: Improve RemoteStorageManager exception handling
jeqo opened a new pull request, #13923: URL: https://github.com/apache/kafka/pull/13923 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14993) Improve TransactionIndex instance handling while copying to and fetching from RSM.
[ https://issues.apache.org/jira/browse/KAFKA-14993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738033#comment-17738033 ] Jorge Esteban Quilcate Otoya commented on KAFKA-14993: -- [~ckamal] just checking if you have done any progress on this one already. If still open, I'd like to help contributing this fix. Let me now, cheers! > Improve TransactionIndex instance handling while copying to and fetching from > RSM. > -- > > Key: KAFKA-14993 > URL: https://issues.apache.org/jira/browse/KAFKA-14993 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > > RSM should throw a ResourceNotFoundException if it does not have > TransactionIndex. Currently, it expects an empty InputStream and creates an > unnecessary file in the cache. This can be avoided by catching > ResourceNotFoundException and not creating an instance. There are minor > cleanups needed in RemoteIndexCache and other TransactionIndex usages. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owners: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner:
[jira] [Commented] (KAFKA-14993) Improve TransactionIndex instance handling while copying to and fetching from RSM.
[ https://issues.apache.org/jira/browse/KAFKA-14993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738035#comment-17738035 ] Kamal Chandraprakash commented on KAFKA-14993: -- [~jeqo] I haven't started to work on this but will raise the patch soon. Will add you as a reviewer for the patch. Thanks! > Improve TransactionIndex instance handling while copying to and fetching from > RSM. > -- > > Key: KAFKA-14993 > URL: https://issues.apache.org/jira/browse/KAFKA-14993 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > > RSM should throw a ResourceNotFoundException if it does not have > TransactionIndex. Currently, it expects an empty InputStream and creates an > unnecessary file in the cache. This can be avoided by catching > ResourceNotFoundException and not creating an instance. There are minor > cleanups needed in RemoteIndexCache and other TransactionIndex usages. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeqo commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
jeqo commented on PR #13828: URL: https://github.com/apache/kafka/pull/13828#issuecomment-1611155345 @showuon I'm trying to test this, but TBRLMM is still complaining about missing bootstrap.servers, even when listener name is provided: ``` kafka-ts | [2023-06-28 10:19:04,131] INFO Initializing the resources. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager) kafka-ts | [2023-06-28 10:19:04,141] ERROR Uncaught exception in thread 'RLMMInitializationThread': (org.apache.kafka.common.utils.KafkaThread) kafka-ts | org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value. kafka-ts | at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:496) kafka-ts | at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:486) kafka-ts | at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:112) kafka-ts | at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:145) kafka-ts | at org.apache.kafka.clients.admin.AdminClientConfig.(AdminClientConfig.java:244) kafka-ts | at org.apache.kafka.clients.admin.Admin.create(Admin.java:144) kafka-ts | at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49) kafka-ts | at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.initializeResources(TopicBasedRemoteLogMetadataManager.java:366) kafka-ts | at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$configure$1(TopicBasedRemoteLogMetadataManager.java:352) kafka-ts | at java.base/java.lang.Thread.run(Thread.java:829) ``` Looking at the code, I can see listener name being passed, ``` kafka-ts | remote.log.metadata.manager.class.name = org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager kafka-ts | remote.log.metadata.manager.class.path = null kafka-ts | remote.log.metadata.manager.impl.prefix = rlmm.config. kafka-ts | remote.log.metadata.manager.listener.name = BROKER ``` but when initializing the resources, properties without the right prefix are ignored: https://github.com/apache/kafka/blob/f32ebeab17ce574660669873402a7f40927d0492/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java#L136-L159 Let me know if I'm reading this properly to create an issue, otherwise I may be missing something. Many thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15131) Improve RemoteStorageManager exception handling documentation
[ https://issues.apache.org/jira/browse/KAFKA-15131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-15131: - Summary: Improve RemoteStorageManager exception handling documentation (was: Improve RemoteStorageManager exception handling) > Improve RemoteStorageManager exception handling documentation > - > > Key: KAFKA-15131 > URL: https://issues.apache.org/jira/browse/KAFKA-15131 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: tiered-storage > > As discussed here[1], RemoteStorageManager javadocs requires clarification > regarding error handling: > * Remove ambiguity on `RemoteResourceNotFoundException` description > * Describe when `RemoteResourceNotFoundException` can/should be thrown > * Describe expectations for idempotent operations when copying/deleting > > [1] > https://issues.apache.org/jira/browse/KAFKA-7739?focusedCommentId=17720936&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17720936 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] joobisb commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart
joobisb commented on code in PR #13862: URL: https://github.com/apache/kafka/pull/13862#discussion_r1245057900 ## docs/quickstart.html: ## @@ -32,7 +32,7 @@ the latest Kafka release and extract it: -$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz +$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz Review Comment: https://github.com/apache/kafka/assets/13068144/e4285029-5a1c-4002-8ed6-f8a95c97e2a0";> If I remove the `language-bash` class from `` it will lose the associated properties as shown in the first image above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joobisb commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart
joobisb commented on code in PR #13862: URL: https://github.com/apache/kafka/pull/13862#discussion_r1245058126 ## docs/quickstart.html: ## @@ -154,9 +154,9 @@ By default, each line you enter will result in a separate event being written to the topic. -$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 -This is my first event -This is my second event +$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 +$ This is my first event +$ This is my second event Review Comment: will update these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738060#comment-17738060 ] David Gammon commented on KAFKA-15116: -- Hi [~mjsax], please see my responses below: # Message A uses an internal store to store information about the entity. The store knows that there is a pending event that is yet to be committed so it blocks until it is committed. The problem happens when Message B (which has a processor that uses the store) tries to get information on it's entity. It will block and timeout because Message A hasn't been committed. # I think our scenario is specifically *during* a rebalance. I've seen code that says if the taskManager is rebalancing then do not commit. # This is more to do with our store and how long it takes before it times out. The timeout then can impact the transaction timeout and producers get fenced etc. # The fix is to add a check for rebalancing in the while loop in runOnce. This checks if a rebalancing is in progress and sets the numIterations to 0 to stop processing of messages. When it has rebalanced it sets numIterations back to 1. # Again I think we are talking about *during* a rebalance rather than before. Thanks, David > Kafka Streams processing blocked during rebalance > - > > Key: KAFKA-15116 > URL: https://issues.apache.org/jira/browse/KAFKA-15116 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0 >Reporter: David Gammon >Priority: Major > > We have a Kafka Streams application that simply takes a messages, processes > it and then produces an event out the other side. The complexity is that > there is a requirement that all events with the same partition key must be > committed before the next message is processed. > This works most of the time flawlessly but we have started to see problems > during deployments where the first message blocks the second message during a > rebalance because the first message isn’t committed before the second message > is processed. This ultimately results in transactions timing out and more > rebalancing. > We’ve tried lots of configuration to get the behaviour we require with no > luck. We’ve now put in a temporary fix so that Kafka Streams works with our > framework but it feels like this might be a missing feature or potentially a > bug. > +Example+ > Given: > * We have two messages (InA and InB). > * Both messages have the same partition key. > * A rebalance is in progress so streams is no longer able to commit. > When: > # Message InA -> processor -> OutA (not committed) > # Message InB -> processor -> blocked because #1 has not been committed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Riedel reassigned KAFKA-15105: -- Assignee: Max Riedel > Flaky test > FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable > - > > Key: KAFKA-15105 > URL: https://issues.apache.org/jira/browse/KAFKA-15105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Josep Prat >Assignee: Max Riedel >Priority: Major > Labels: flaky-test > > Test > integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() > became flaky. An example can be found here: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/] > The error might be caused because of a previous kafka cluster used for > another test wasn't cleaned up properly before this one run. > > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738077#comment-17738077 ] Max Riedel commented on KAFKA-15105: Hi [~josep.prat] Thanks for giving me the necessary Jira rights. I was able to assign the ticket to me now. So far, all test runs I did on my local environment passed. But I will try the option to run until failure and see what I can learn from that. My question was about the CI. Is it correct that we are not able to rerun builds there or get more detailed output? I couldn't find the error message "Topic '__consumer_offsets' already exists." in the standard output of the test run > Flaky test > FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable > - > > Key: KAFKA-15105 > URL: https://issues.apache.org/jira/browse/KAFKA-15105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Josep Prat >Assignee: Max Riedel >Priority: Major > Labels: flaky-test > > Test > integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() > became flaky. An example can be found here: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/] > The error might be caused because of a previous kafka cluster used for > another test wasn't cleaned up properly before this one run. > > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738081#comment-17738081 ] Josep Prat commented on KAFKA-15105: Hi [~riedelmax] , Only maintainers + a subgroup of collaborators can rerun builds in CI, but even for them, they can just run them as they are (no more detailed output). And sorry, I just realized I copy pasted the wrong output. This is the right one: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/] I'm updating the description of the Jira issue.+ > Flaky test > FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable > - > > Key: KAFKA-15105 > URL: https://issues.apache.org/jira/browse/KAFKA-15105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Josep Prat >Assignee: Max Riedel >Priority: Major > Labels: flaky-test > > Test > integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() > became flaky. An example can be found here: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/] > The error might be caused because of a previous kafka cluster used for > another test wasn't cleaned up properly before this one run. > > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-15105: --- Description: Test integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() became flaky. An example can be found here: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/ The error might be caused because of a previous kafka cluster used for another test wasn't cleaned up properly before this one run. h3. Error Message {code:java} org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists.{code} h3. Stacktrace {code:java} org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists. {code} was: Test integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() became flaky. An example can be found here: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/] The error might be caused because of a previous kafka cluster used for another test wasn't cleaned up properly before this one run. h3. Error Message {code:java} org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists.{code} h3. Stacktrace {code:java} org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' already exists. {code} > Flaky test > FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable > - > > Key: KAFKA-15105 > URL: https://issues.apache.org/jira/browse/KAFKA-15105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Josep Prat >Assignee: Max Riedel >Priority: Major > Labels: flaky-test > > Test > integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() > became flaky. An example can be found here: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/ > The error might be caused because of a previous kafka cluster used for > another test wasn't cleaned up properly before this one run. > > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738081#comment-17738081 ] Josep Prat edited comment on KAFKA-15105 at 6/28/23 12:03 PM: -- Hi [~riedelmax] , Only maintainers + a subgroup of collaborators can rerun builds in CI, but even for them, they can just run them as they are (no more detailed output). And sorry, I just realized I copy pasted the wrong ci build link. This is the right one: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/] I'm updating the description of the Jira issue.+ was (Author: jlprat): Hi [~riedelmax] , Only maintainers + a subgroup of collaborators can rerun builds in CI, but even for them, they can just run them as they are (no more detailed output). And sorry, I just realized I copy pasted the wrong output. This is the right one: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/] I'm updating the description of the Jira issue.+ > Flaky test > FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable > - > > Key: KAFKA-15105 > URL: https://issues.apache.org/jira/browse/KAFKA-15105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0 >Reporter: Josep Prat >Assignee: Max Riedel >Priority: Major > Labels: flaky-test > > Test > integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable() > became flaky. An example can be found here: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/ > The error might be caused because of a previous kafka cluster used for > another test wasn't cleaned up properly before this one run. > > h3. Error Message > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists.{code} > h3. Stacktrace > {code:java} > org.apache.kafka.common.errors.TopicExistsException: Topic > '__consumer_offsets' already exists. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
divijvaidya commented on code in PR #13923: URL: https://github.com/apache/kafka/pull/13923#discussion_r1245118305 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java: ## @@ -107,20 +110,22 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, * @param endPosition end position of log segment to be read, inclusive. * @return input stream of the requested log segment data. * @throws RemoteStorageException if there are any errors while fetching the desired segment. - * @throws RemoteResourceNotFoundException when there are no resources associated with the given remoteLogSegmentMetadata. + * @throws RemoteResourceNotFoundException the requested log segment is not found in the remote storage. */ InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int startPosition, int endPosition) throws RemoteStorageException; /** * Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}. + * + * If the index is not present (e.g. Transaction index may not exist), throws {@link RemoteResourceNotFoundException} Review Comment: Perhaps, we can be more descriptive here. My suggestion is to add the following (free free to change the wording): `e.g. Transaction index may not exist because segments create prior to version 2.8.0 will not have transaction index associated with them.` `@throws RemoteResourceNotFoundException the requested index is not found in the remote storage. The caller of this function are encouraged to re-create the indexes from the segment as the suggested way of handling this error.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
showuon commented on PR #13923: URL: https://github.com/apache/kafka/pull/13923#issuecomment-1611315037 I'll take a look this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on PR #13880: URL: https://github.com/apache/kafka/pull/13880#issuecomment-1611323500 @jolshan Thanks. I have addressed the remaining comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245148023 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -88,10 +93,12 @@ public class GroupMetadataManager { public static class Builder { private LogContext logContext = null; private SnapshotRegistry snapshotRegistry = null; +private Time time = null; private List assignors = null; -private TopicsImage topicsImage = null; Review Comment: We actually need other features from the MetadataImage (e.g. metadata version). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245154086 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -506,32 +555,54 @@ private CoordinatorResult consumerGr .setClientHost(clientHost) .build(); +boolean updatedMemberSubscriptions = false; if (!updatedMember.equals(member)) { records.add(newMemberSubscriptionRecord(groupId, updatedMember)); if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " + updatedMember.subscribedTopicNames()); +updatedMemberSubscriptions = true; +} -subscriptionMetadata = group.computeSubscriptionMetadata( -member, -updatedMember, -topicsImage -); - -if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " -+ subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); -} +if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { +log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " + +updatedMember.subscribedTopicRegex()); +updatedMemberSubscriptions = true; +} +} -groupEpoch += 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +long currentTimeMs = time.milliseconds(); +boolean maybeUpdateMetadata = updatedMemberSubscriptions || group.refreshMetadataNeeded(currentTimeMs); +boolean updatedSubscriptionMetadata = false; +if (maybeUpdateMetadata) { +subscriptionMetadata = group.computeSubscriptionMetadata( +member, +updatedMember, +metadataImage.topics() +); -log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + "."); +if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { +log.info("[GroupId " + groupId + "] Computed new subscription metadata: " ++ subscriptionMetadata + "."); +records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +updatedSubscriptionMetadata = true; } } +if (updatedMemberSubscriptions || updatedSubscriptionMetadata) { +groupEpoch += 1; +records.add(newGroupEpochRecord(groupId, groupEpoch)); +log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + "."); +} + +if (maybeUpdateMetadata) { Review Comment: Reworked this part. Let me know if it looks better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245156156 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -709,14 +780,16 @@ public void replay( String groupId = key.groupId(); String memberId = key.memberId(); +ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null); +Set oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames()); + if (value != null) { -ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true); ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true); consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember) .updateWith(value) .build()); +updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); Review Comment: Good call. Moved it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245157302 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or +// deleted topics. +Set allGroupIds = new HashSet<>(); +delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> { +String topicName = topicDelta.name(); +Set groupIds = groupsByTopics.get(topicName); +if (groupIds != null) allGroupIds.addAll(groupIds); +}); +delta.topicsDelta().deletedTopicIds().forEach(topicId -> { +TopicImage topicImage = delta.image().topics().getTopic(topicId); +Set groupIds = groupsByTopics.get(topicImage.name()); +if (groupIds != null) allGroupIds.addAll(groupIds); Review Comment: Correct. I also simplified this code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request, #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
hudeqi opened a new pull request, #13924: URL: https://github.com/apache/kafka/pull/13924 This pr is used to remove the metrics in LogCleanerManager when logCleaner is closed. This pr has passed the corresponding unit test, and it is part of KAFKA-15129. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245209953 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1021,34 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ +public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { +metadataImage = newImage; + +// Notify all the groups subscribed to the created, updated or Review Comment: > Is created topics a different method in topicsDelta? Shouldn't we have createdTopicIds and we add them? Or is changedTopics accounting for that? Are there other changes besides topic creation we can have? My understanding is that created topics are included in the changed topics as well. > I guess my question then is what is the flow for updating the groups with the image? This will just happen on the next heartbeat since we set metadataImage to new image? Right. The idea is to flag all the groups subscribed topics and to let them update themselves on the next heartbeat. I did it this way because we can also rely on this mechanism to refresh regex based subs every X minutes later on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245212300 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -119,6 +131,18 @@ public String toString() { */ private final TimelineHashMap> currentPartitionEpoch; +/** + * The next metadata refresh time. It consists of a timestamp in milliseconds together with + * the group epoch at the time of setting it. The metadata refresh time is considered as a + * soft state (read that it is not stored in a timeline data structure). It is like this + * because it is not persisted to the log. The group epoch is here to ensure that the + * next metadata refresh time is invalidated if the group epoch does not correspond to + * the current group epoch. This can happen if the next metadata refresh time is updated + * after having refreshed the metadata but the write operation failed. In this case, the + * time is not automatically rollback. Review Comment: It is actually the other way around. The refresh time is updated immediately but it is not rolled back if the write failed. This is the reason why I have included the group epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245213037 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -423,6 +456,47 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +/** + * Updates the next metadata refresh time. + * + * @param nextTimeMs The next time in milliseconds. + * @param groupEpoch The associated group epoch. + */ +public void setNextMetadataRefreshTime( +long nextTimeMs, +int groupEpoch +) { +this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch); +} + +/** + * Resets the next metadata refresh. Review Comment: Right. Update immediately on the next heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245214287 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -423,6 +456,47 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +/** + * Updates the next metadata refresh time. + * + * @param nextTimeMs The next time in milliseconds. + * @param groupEpoch The associated group epoch. + */ +public void setNextMetadataRefreshTime( +long nextTimeMs, +int groupEpoch +) { +this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch); +} + +/** + * Resets the next metadata refresh. + */ +public void resetNextMetadataRefreshTime() { +this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY; +} + +/** + * Checks if a metadata refresh is required. A refresh is required in two cases: + * 1) The next update time is smaller or equals to the current time; + * 2) The group epoch associated with the next update time is smaller than Review Comment: Right. In this case, the time is set to zero so it is always smaller than the current time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245226991 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -141,7 +141,7 @@ public static class TimeAndEpoch { * after having refreshed the metadata but the write operation failed. In this case, the * time is not automatically rolled back. */ -private TimeAndEpoch nextMetadataRefreshTime = TimeAndEpoch.EMPTY; Review Comment: I renamed this one. It seems that `deadline` is more appropriate than `nextTime`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
jeqo commented on code in PR #13923: URL: https://github.com/apache/kafka/pull/13923#discussion_r1245252224 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java: ## @@ -107,20 +110,22 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, * @param endPosition end position of log segment to be read, inclusive. * @return input stream of the requested log segment data. * @throws RemoteStorageException if there are any errors while fetching the desired segment. - * @throws RemoteResourceNotFoundException when there are no resources associated with the given remoteLogSegmentMetadata. + * @throws RemoteResourceNotFoundException the requested log segment is not found in the remote storage. */ InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int startPosition, int endPosition) throws RemoteStorageException; /** * Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}. + * + * If the index is not present (e.g. Transaction index may not exist), throws {@link RemoteResourceNotFoundException} Review Comment: thanks @divijvaidya, applying this suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #13876: KAFKA-10733: Clean up producer exceptions
lucasbru commented on PR #13876: URL: https://github.com/apache/kafka/pull/13876#issuecomment-1611471103 @jolshan Yes, the main reason for wrapping was consistency. However, I'm now considering a different kind of consistency - never wrap fatal errors - which is what was originally suggested in the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joobisb commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart
joobisb commented on code in PR #13862: URL: https://github.com/apache/kafka/pull/13862#discussion_r1245268151 ## docs/quickstart.html: ## @@ -154,9 +154,9 @@ By default, each line you enter will result in a separate event being written to the topic. -$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 -This is my first event -This is my second event +$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 +$ This is my first event +$ This is my second event Review Comment: resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
jeqo commented on PR #13923: URL: https://github.com/apache/kafka/pull/13923#issuecomment-1611480623 @divijvaidya adding the following changes to the KIP: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=97554472&selectedPageVersions=363&selectedPageVersions=362 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums
cadonna opened a new pull request, #13925: URL: https://github.com/apache/kafka/pull/13925 With the state updater, the task manager needs also to look into the tasks owned by the state updater when computing the sum of offsets of the state. This sum of offsets is used by the high availability assignor to assign warm-up replicas. If the task manager does not take into account tasks in the state updater, a warm-up replica will never report back that the state for the corresponding task has caught up. Consequently, the warm-up replica will never be dismissed and probing rebalances will never end.. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #13902: MINOR: fix flaky ZkMigrationIntegrationTest.testNewAndChangedTopicsIn…
chia7712 merged PR #13902: URL: https://github.com/apache/kafka/pull/13902 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #13884: MINOR: fix typos for client
divijvaidya merged PR #13884: URL: https://github.com/apache/kafka/pull/13884 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
divijvaidya commented on PR #13923: URL: https://github.com/apache/kafka/pull/13923#issuecomment-1611589151 Thanks @jeqo. Looks good to me but we will wait for Luke and/or Satish to look into this before we merge this one in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
jeqo commented on PR #13923: URL: https://github.com/apache/kafka/pull/13923#issuecomment-1611590682 Sure, make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
divijvaidya commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1611605733 > so only that one verification per transaction ah, I had missed this part that it will not be recorded on "every" message append. Only for verified cases. I think we should be good to merge this in without worrying about the latency impact. I don't suppose a single histogram should add much and we also didn't see an impact in the producer perf test that you did here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums
lucasbru commented on code in PR #13925: URL: https://github.com/apache/kafka/pull/13925#discussion_r1245383102 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1138,28 +1138,35 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); -// Not all tasks will create directories, and there may be directories for tasks we don't currently own, -// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should -// just have an empty changelogOffsets map. -for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { -final Task task = tasks.contains(id) ? tasks.task(id) : null; -// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint -if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { +final Map tasks = allTasks(); +final Set createdAndClosedTasks = new HashSet<>(); +for (final Task task : tasks.values()) { +if (task.state() != State.CREATED && task.state() != State.CLOSED) { final Map changelogOffsets = task.changelogOffsets(); if (changelogOffsets.isEmpty()) { -log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id); +log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", +task.id()); } else { -taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets)); +taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets)); } } else { -final File checkpointFile = stateDirectory.checkpointFileFor(id); -try { -if (checkpointFile.exists()) { -taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); -} -} catch (final IOException e) { -log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); +createdAndClosedTasks.add(task.id()); Review Comment: nit: if you want to do it with fewer collections, you could inititialize `lockedTaskDirectoriesOfNonOwnedTasks` earlier, and just remove directly from that set in the `if` branch, instead of adding to `createdAndClosedTasks` in the `else` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums
lucasbru commented on code in PR #13925: URL: https://github.com/apache/kafka/pull/13925#discussion_r1245427580 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1138,28 +1138,33 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); -// Not all tasks will create directories, and there may be directories for tasks we don't currently own, -// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should -// just have an empty changelogOffsets map. -for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { -final Task task = tasks.contains(id) ? tasks.task(id) : null; -// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint -if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { +final Map tasks = allTasks(); +final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = Review Comment: Ah, I recommended this change thinking that `lockedTaskDirectories` always includes all `ClosedAndCreatedTasks` -- I think it does right? So it should be enough to assign this to `lockedTaskDirectories`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #13922: [MINOR] remove the currentStream.close() statement causing exit code issue
divijvaidya merged PR #13922: URL: https://github.com/apache/kafka/pull/13922 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13902: MINOR: fix flaky ZkMigrationIntegrationTest.testNewAndChangedTopicsIn…
divijvaidya commented on PR #13902: URL: https://github.com/apache/kafka/pull/13902#issuecomment-1611682088 @mumrah Should we backport this for 3.5 branch as well since the code that introduced this flakiness exists over there as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on PR #13798: URL: https://github.com/apache/kafka/pull/13798#issuecomment-1611687116 Thanks for the review 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15132) Implement disable & re-enablement for Tiered Storage
Divij Vaidya created KAFKA-15132: Summary: Implement disable & re-enablement for Tiered Storage Key: KAFKA-15132 URL: https://issues.apache.org/jira/browse/KAFKA-15132 Project: Kafka Issue Type: New Feature Components: core Reporter: Divij Vaidya Assignee: Divij Vaidya KIP-405 [1] introduces the Tiered Storage feature in Apache Kafka. One of the limitations mentioned in the KIP is inability to re-enable TS on a topic after it has been disabled. {quote}Once tier storage is enabled for a topic, it can not be disabled. We will add this feature in future versions. One possible workaround is to create a new topic and copy the data from the desired offset and delete the old topic. {quote} This task will propose a new KIP which extends on KIP-405 to describe the behaviour on on disablement and re-enablement of tiering storage for a topic. The solution will apply for both Zk and KRaft variants. [1] KIP-405 - [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15132) Implement disable & re-enablement for Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738189#comment-17738189 ] Divij Vaidya commented on KAFKA-15132: -- I and [~mehbey] are currently working on drafting a KIP for this. KIP publish ETA - 10th July. > Implement disable & re-enablement for Tiered Storage > > > Key: KAFKA-15132 > URL: https://issues.apache.org/jira/browse/KAFKA-15132 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Labels: kip > > KIP-405 [1] introduces the Tiered Storage feature in Apache Kafka. One of the > limitations mentioned in the KIP is inability to re-enable TS on a topic > after it has been disabled. > {quote}Once tier storage is enabled for a topic, it can not be disabled. We > will add this feature in future versions. One possible workaround is to > create a new topic and copy the data from the desired offset and delete the > old topic. > {quote} > This task will propose a new KIP which extends on KIP-405 to describe the > behaviour on on disablement and re-enablement of tiering storage for a topic. > The solution will apply for both Zk and KRaft variants. > [1] KIP-405 - > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan merged pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan merged PR #13798: URL: https://github.com/apache/kafka/pull/13798 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request, #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
hudeqi opened a new pull request, #13926: URL: https://github.com/apache/kafka/pull/13926 This pr is used to remove the metrics in GroupMetadataManager when shutdown. This pr has passed the corresponding unit test, and it is part of [KAFKA-15129](https://issues.apache.org/jira/browse/KAFKA-15129). A special advantage is that since the metric registered through metricsGroup is removed during shutdown, the original "recreateGauge" method (mentioned in [KAFKA-5565](https://github.com/apache/kafka/pull/3506)) is no longer needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums
cadonna commented on code in PR #13925: URL: https://github.com/apache/kafka/pull/13925#discussion_r1245477150 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1138,28 +1138,33 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); -// Not all tasks will create directories, and there may be directories for tasks we don't currently own, -// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should -// just have an empty changelogOffsets map. -for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { -final Task task = tasks.contains(id) ? tasks.task(id) : null; -// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint -if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { +final Map tasks = allTasks(); +final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = Review Comment: I do not think there is guarantee that `lockedTaskDirectories` contains any tasks the client owns. `lockedTaskDirectories` are just the non-empty task directories in the state directory when a rebalance starts. However, a task directory is created when a task is created, i.e., it is in state `CREATE`. A task directory is not deleted when a task is closed, i.e., in state `CLOSED`. This might be a correlation and not a thought-out invariant. At least, the original code did not rely on this since it used `union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())`. I am also somehow reluctant to rely on such -- IMO -- brittle invariant. The creation of the task directory can probably be moved to other parts of the code like when the task is initialized which would mean that there is a interval in which the task is in state `CREATED` but does not have a task directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15133) RequestMetrics MessageConversionsTimeMs count is ticked even when no conversion occurs
Edoardo Comar created KAFKA-15133: - Summary: RequestMetrics MessageConversionsTimeMs count is ticked even when no conversion occurs Key: KAFKA-15133 URL: https://issues.apache.org/jira/browse/KAFKA-15133 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.4.1, 3.5.0 Reporter: Edoardo Comar Assignee: Edoardo Comar The Histogram {}{color:#00}RequestChannel{color}.{}}}messageConversionsTimeHist}} is ticked even when a Produce/Fetch request incurred no conversion, because a new entry is added to the historgram distribution, with a 0ms value. It's confusing comparing the Histogram kafka.network RequestMetrics MessageConversionsTimeMs with the Meter kafka.server BrokerTopicMetrics ProduceMessageConversionsPerSec because for the latter, the metric is ticked only if a conversion actually occurred -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums
cadonna commented on code in PR #13925: URL: https://github.com/apache/kafka/pull/13925#discussion_r1245477150 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1138,28 +1138,33 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); -// Not all tasks will create directories, and there may be directories for tasks we don't currently own, -// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should -// just have an empty changelogOffsets map. -for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { -final Task task = tasks.contains(id) ? tasks.task(id) : null; -// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint -if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { +final Map tasks = allTasks(); +final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = Review Comment: I do not think there is guarantee that `lockedTaskDirectories` contains any tasks the client owns. `lockedTaskDirectories` are just the non-empty task directories in the state directory when a rebalance starts. However, a task directory is created when a task is created, i.e., it is in state `CREATE`. A task directory is not deleted when a task is closed, i.e., in state `CLOSED`. This might be a correlation and not a thought-out invariant. At least, the original code did not rely on this since it used `union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())`. I am also somehow reluctant to rely on such -- IMO -- brittle invariant. As an example, in future we could decide to move the creation of the task directory to other parts of the code -- like when the task is initialized -- which would mean that there is a interval in which the task is in state `CREATED` but does not have a task directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jeffkbkim commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1245486249 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RecordSerdeTest { +@Test +public void testSerializeKey() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 Review Comment: in general, it looks like we allow different key/value pairs, i.e. OffsetCommitKey + GroupMetadataValue. i guess we would hit an error on runtime when we replay a record with an invalid pair. i feel like we should enforce this when serializing. wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #13927: KAFKA-10199: Enable state updater by default
cadonna opened a new pull request, #13927: URL: https://github.com/apache/kafka/pull/13927 Now that the implementation for the state updater is done, we can enable it by default. This PR enables the state updater by default and fixes code that made assumptions that are not true when the state updater is enabled (mainly tests). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jeffkbkim commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1245489701 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java: ## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RecordSerdeTest { +@Test +public void testSerializeKey() { +RecordSerde serializer = new RecordSerde(); +Record record = new Record( +new ApiMessageAndVersion( +new ConsumerGroupMetadataKey().setGroupId("group"), +(short) 1 Review Comment: nvm, just saw the comment below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15133) RequestMetrics MessageConversionsTimeMs count is ticked even when no conversion occurs
[ https://issues.apache.org/jira/browse/KAFKA-15133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738205#comment-17738205 ] Edoardo Comar commented on KAFKA-15133: --- [~rsivaram] - reading KIP 188 I am not sure if the intent of the MessageConversionsTimeMs Histogram was also to record the number of time there was no conversion, so that if only say 1% of messages required conversions, we'd see that in the percentile distribution I found that I was comparing the count with the one in BrokerTopicMetrics and the mismatch was obvious > RequestMetrics MessageConversionsTimeMs count is ticked even when no > conversion occurs > -- > > Key: KAFKA-15133 > URL: https://issues.apache.org/jira/browse/KAFKA-15133 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0, 3.4.1 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Minor > > The Histogram > {}{color:#00}RequestChannel{color}.{}}}messageConversionsTimeHist}} > is ticked even when a Produce/Fetch request incurred no conversion, > because a new entry is added to the historgram distribution, with a 0ms value. > > It's confusing comparing the Histogram > kafka.network RequestMetrics MessageConversionsTimeMs > with the Meter > kafka.server BrokerTopicMetrics ProduceMessageConversionsPerSec > because for the latter, the metric is ticked only if a conversion actually > occurred -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jeffkbkim commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1245495804 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { Review Comment: would that require a version bump or would we not have to since it's not actually used anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1611775967 @rreddy-22 that's what I mean, thank you, could any one merge this if possible or help review to verify this pr is useful? we are now using CooperativeSticky strategy in kafka-client3.5 but meet serious problem that it may trigger rebalance with colossal times(complete one rebalance about 3sec, then keep trigger new balance within more than 10minutes, that's say, at least rebalance 200 times). for now we cannot give much more useful information and we want to verify whether that is caused by this problem after this pr is checked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1245517961 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { Review Comment: I see this as an internal change. I was actually thinking about doing something like the following. ``` { "apiKey": "3", "type": "key", "name": "ConsumerGroupMetadataKey", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." } ] } ``` ``` { "apiKey": "3", "type": "value", "name": "ConsumerGroupMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "Epoch", "versions": "0+", "type": "int32", "about": "The group epoch." } ] } ``` This would allow us to add rules in the generator. For instance, an api key must have a key and a value. This would also allow us to generate something like `ApiMessageType` for the types. More importantly, that would remove that weird usage of the version as the type in the key. We could also consider having both the key and the value defined in one file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM
[ https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738216#comment-17738216 ] Satish Duggana commented on KAFKA-15083: [~showuon] Can you give details on what you tried? > Passing "remote.log.metadata.*" configs into RLMM > - > > Key: KAFKA-15083 > URL: https://issues.apache.org/jira/browse/KAFKA-15083 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Based on the > [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]: > |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM > implementation creates producer and consumer instances. Common client > propoerties can be configured with `remote.log.metadata.common.client.` > prefix. User can also pass properties specific to > {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` > and `remote.log.metadata.consumer.` prefixes. These will override properties > with `remote.log.metadata.common.client.` prefix.{color} > {color:#00}Any other properties should be prefixed with > "remote.log.metadata." and these will be passed to > RemoteLogMetadataManager#configure(Map props).{color} > {color:#00}For ex: Security configuration to connect to the local broker > for the listener name configured are passed with props.{color}| > > This is missed from current implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jeffkbkim commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1245523659 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { Review Comment: thanks, that would be cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
dajac commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1611790598 @flashmouse Thanks for the PR. It seems that this condition has been around for a while now. Did you observe a similar issue with version prior to 3.5? I wonder what if changes introduced in 3.5 for the rack aware assignment made this worse than it used to be. I agree that we need to get to the bottom of this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect
C0urante commented on code in PR #13915: URL: https://github.com/apache/kafka/pull/13915#discussion_r1245532682 ## docs/connect.html: ## @@ -313,7 +313,13 @@ REST API DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector -GET /connectors/{name}/offsets - get the current offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details) +Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details): + +GET /connectors/{name}/offsets - get the current offsets for a connector +DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state. Review Comment: It's possible; I'll add some GH suggestions demonstrating how. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect
C0urante commented on code in PR #13915: URL: https://github.com/apache/kafka/pull/13915#discussion_r1245532972 ## docs/connect.html: ## @@ -301,7 +301,7 @@ REST API GET /connectors/{name}/tasks - get a list of tasks currently running for a connector GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed. -PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. +PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state Review Comment: ```suggestion PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state ``` ## docs/connect.html: ## @@ -313,7 +313,22 @@ REST API DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector -GET /connectors/{name}/offsets - get the current offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details) +Offsets management endpoints (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details): + +GET /connectors/{name}/offsets - get the current offsets for a connector +DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state +PATCH /connectors/{name}/offsets - alter the offsets for a connector. The connector must exist and must be in the stopped state. The request body should be a JSON object containing a JSON array offsets field, similar to the response body of the GET /connectors/{name}/offsets endpoint Review Comment: ```suggestion DELETE /connectors/{name}/offsets - reset the offsets for a connector. The connector must exist and must be in the stopped state (see PUT /connectors/{name}/stop) PATCH /connectors/{name}/offsets - alter the offsets for a connector. The connector must exist and must be in the stopped state (see PUT /connectors/{name}/stop). The request body should be a JSON object containing a JSON array offsets field, similar to the response body of the GET /connectors/{name}/offsets endpoint ``` ## docs/connect.html: ## @@ -313,7 +313,13 @@ REST API DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration GET /connectors/{name}/topics - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued PUT /connectors/{name}/topics/reset - send a request to empty the set of active topics of a connector -GET /connectors/{name}/offsets - get the current offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details) +Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect";>KIP-875 for more details): + +GET /connectors/{name}/offsets - get the current offsets for a connector +DELETE /connectors/{name}/
[GitHub] [kafka] guizmaii commented on a diff in pull request #13914: KAFKA-14972: Support async runtimes in consumer
guizmaii commented on code in PR #13914: URL: https://github.com/apache/kafka/pull/13914#discussion_r1245542620 ## clients/src/main/java/org/apache/kafka/clients/consumer/ThreadAccessKey.java: ## @@ -0,0 +1,8 @@ +package org.apache.kafka.clients.consumer; + +/** + * A key that can be used to pass access to the Kafka consumer to another thread. + * Can be obtained via {@link KafkaConsumer#getThreadAccessKey()}. + */ +public class ThreadAccessKey { Review Comment: `final`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1611815006 @dajac we haven't seen such many rebalance times before, we before use CooperativeSticky with kafka-client 3.0.0, rebalance times increase very slight. but we are not use kafka-client 3.5 directly because CooperativeStickyAssignor in 3.5 need upgrad server to 3.5 too if need rack-aware assign logic and we couldn't upgrade server right now so we move AbstractStickyAssignor and CooperativeStickyAssignor logic to our application and move the rackInfo to userData. although we have passd all unit test in kafka-client after do this change, we still doubt whether this change caused rebalance time increase so we're keeping check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1611816013 Some new test failures on latest build. I will rebuild and investigate to see if this is new to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds
ijuma commented on code in PR #12948: URL: https://github.com/apache/kafka/pull/12948#discussion_r124885 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException { // Skip bytes stored in intermediate buffer first int avail = count - pos; -long bytesSkipped = (avail < remaining) ? avail : remaining; +int bytesSkipped = Math.min(avail, (int) remaining); Review Comment: Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal
[ https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738231#comment-17738231 ] Matthias J. Sax commented on KAFKA-13973: - The GitHub issue for RocksDB was "declined". I did file a follow up ticker for Speedb ([https://github.com/speedb-io/speedb/issues/583)] – maybe we get help there. > block-cache-capacity metrics worth twice as much as normal > -- > > Key: KAFKA-13973 > URL: https://issues.apache.org/jira/browse/KAFKA-13973 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: Sylvain Le Gouellec >Priority: Minor > Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot > 2022-06-09 at 09.33.50.png > > > I have created a very simple kafka-streams application with 1 state store. > I'm very surprised that the block-cache-capacity metrics show a {{100MB}} > block cache capacity instead of the default one in kafka streams is > {{{}50MB{}}}. > > My topology : > StreamsBuilder sb = new StreamsBuilder(); > sb.stream("input") > .groupByKey() > .count() > .toStream() > .to("output"); > > I checkout the {{kafka-streams}} code and I saw a strange thing. When the > {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column > families for backward compatibility with a potentiel old key/value store. > In this method, {{setDbAccessor(col1, col2)}} if the first column is not > valid, well you close this one > ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]). > But regarding the rocksdb instance, it's seems that the column families is > not deleted completely and the metrics exposed by [Rocksdb continue to > aggregate > (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373] > {{block-cache-capacity }}for both column families (default and > keyValueWithTimestamp). > Maybe you have to drop explicitly the column family, in the > {{setDbAccessor(col1, col2)}} if the first column is not valid (like > {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}}) > > I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first > column is not valid like : }} > > {code:java} > private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, >final ColumnFamilyHandle > withTimestampColumnFamily) throws RocksDBException { > final RocksIterator noTimestampsIter = > db.newIterator(noTimestampColumnFamily); > noTimestampsIter.seekToFirst(); > if (noTimestampsIter.isValid()) { > log.info("Opening store {} in upgrade mode", name); > dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, > withTimestampColumnFamily); > } else { > log.info("Opening store {} in regular mode", name); > dbAccessor = new > SingleColumnFamilyAccessor(withTimestampColumnFamily); > noTimestampColumnFamily.close(); > db.dropColumnFamily(noTimestampColumnFamily); // try fix it > } > noTimestampsIter.close(); > }{code} > > > > {{But it's seems that you can't drop the default column family in RocksDb > (see screenshot).}} > {{*So how can we have the real block-cache-capacity metrics value in Kafka > Streams monitoring ?* }} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
rreddy-22 commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1611886952 > @flashmouse Thanks for the PR. It seems that this condition has been around for a while now. Did you observe a similar issue with version prior to 3.5? I wonder what if changes introduced in 3.5 for the rack aware assignment made this worse than it used to be. I agree that we need to get to the bottom of this. The rack awareness changes introduce a whole iteration through all the potential consumers to check if any consumer has a matching rack and in the worst case scenario where no consumer with matching rack is found, the whole iteration happens all over again. Previously we would only iterate through the list once, so potentially the time could've doubled now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)
vcrfxia commented on PR #13855: URL: https://github.com/apache/kafka/pull/13855#issuecomment-1611889559 Hey @wcarlson5 did you mean to push a new commit to this PR since @cadonna and I last reviewed? Some of your replies mention having made changes but I don't see any. Nothing blocking from my side, though I do think that the latest round of suggestions (including test coverage improvements and @cadonna 's suggestion for avoiding an unnecessary range query) would be good to incorporate either in this PR or the next one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
jolshan commented on code in PR #13880: URL: https://github.com/apache/kafka/pull/13880#discussion_r1245609479 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java: ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {{@link Record}}. + */ +public class RecordSerde implements PartitionWriter.Serializer, CoordinatorLoader.Deserializer { +@Override +public byte[] serializeKey(Record record) { Review Comment: Let's file a JIRA if we want to do this in a follow up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
artemlivshits commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1245541275 ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -240,17 +240,18 @@ object RequestChannel extends Logging { val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = + val overrideMetricNames = if (header.apiKey == ApiKeys.FETCH) { - val isFromFollower = body[FetchRequest].isFromFollower - Seq( -if (isFromFollower) RequestMetrics.followFetchMetricName + val specifiedMetricName = +if (body[FetchRequest].isFromFollower) RequestMetrics.followFetchMetricName else RequestMetrics.consumerFetchMetricName - ) + Seq(specifiedMetricName, header.apiKey.name) +} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) { +Seq(RequestMetrics.verifyPartitionsInTxnMetricName) Review Comment: It looks like we previously had a metric for ADD_PARTITIONS_TO_TXN and now we don't. The FETCH metric has a metric that is combined consumer and follower, but we also have the common metric. ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -240,16 +240,17 @@ object RequestChannel extends Logging { val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = + val metricNames = if (header.apiKey == ApiKeys.FETCH) { - val isFromFollower = body[FetchRequest].isFromFollower - Seq( -if (isFromFollower) RequestMetrics.followFetchMetricName + val specifiedMetricName = +if (body[FetchRequest].isFromFollower) RequestMetrics.followFetchMetricName else RequestMetrics.consumerFetchMetricName - ) + Seq(specifiedMetricName, header.apiKey.name) +} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].verifyOnlyRequest()) { +Seq(RequestMetrics.verifyPartitionsInTxnMetricName) +} else { + Seq(header.apiKey.name) } -else Seq.empty - val metricNames = fetchMetricNames :+ header.apiKey.name Review Comment: Looks like we're removing the ADD_PARTITIONS_TO_TXN metric, which would be a break if someone used it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1245628587 ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -240,17 +240,18 @@ object RequestChannel extends Logging { val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = + val overrideMetricNames = if (header.apiKey == ApiKeys.FETCH) { - val isFromFollower = body[FetchRequest].isFromFollower - Seq( -if (isFromFollower) RequestMetrics.followFetchMetricName + val specifiedMetricName = +if (body[FetchRequest].isFromFollower) RequestMetrics.followFetchMetricName else RequestMetrics.consumerFetchMetricName - ) + Seq(specifiedMetricName, header.apiKey.name) +} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) { +Seq(RequestMetrics.verifyPartitionsInTxnMetricName) Review Comment: We are not. It is still seen in the else statement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1245629399 ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -240,17 +240,18 @@ object RequestChannel extends Logging { val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos) - val fetchMetricNames = + val overrideMetricNames = if (header.apiKey == ApiKeys.FETCH) { - val isFromFollower = body[FetchRequest].isFromFollower - Seq( -if (isFromFollower) RequestMetrics.followFetchMetricName + val specifiedMetricName = +if (body[FetchRequest].isFromFollower) RequestMetrics.followFetchMetricName else RequestMetrics.consumerFetchMetricName - ) + Seq(specifiedMetricName, header.apiKey.name) +} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) { +Seq(RequestMetrics.verifyPartitionsInTxnMetricName) Review Comment: It doesn't make sense to have the common metric here, since we want the two to be distinct. The fetch metrics are different since we have a unique metric for both fetch types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums
lucasbru commented on code in PR #13925: URL: https://github.com/apache/kafka/pull/13925#discussion_r1245664165 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1138,28 +1138,33 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); -// Not all tasks will create directories, and there may be directories for tasks we don't currently own, -// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should -// just have an empty changelogOffsets map. -for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { -final Task task = tasks.contains(id) ? tasks.task(id) : null; -// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint -if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { +final Map tasks = allTasks(); +final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = Review Comment: Well, if there is no task directory, there is no checkpoint to process. So it's safe to not do anything in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums
lucasbru commented on code in PR #13925: URL: https://github.com/apache/kafka/pull/13925#discussion_r1245664165 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1138,28 +1138,33 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); -// Not all tasks will create directories, and there may be directories for tasks we don't currently own, -// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should -// just have an empty changelogOffsets map. -for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { -final Task task = tasks.contains(id) ? tasks.task(id) : null; -// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint -if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { +final Map tasks = allTasks(); +final Set lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks = Review Comment: Well, if there is no task directory, there is no checkpoint to process. So it's safe to not do anything in this case. All you'd do by adding more tasks is to later skip on the check `checkPointFile.exists()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15028) AddPartitionsToTxnManager metrics
[ https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-15028. Resolution: Fixed > AddPartitionsToTxnManager metrics > - > > Key: KAFKA-15028 > URL: https://issues.apache.org/jira/browse/KAFKA-15028 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Attachments: latency-cpu.html > > > KIP-890 added metrics for the AddPartitionsToTxnManager > VerificationTimeMs – number of milliseconds from adding partition info to the > manager to the time the response is sent. This will include the round trip to > the transaction coordinator if it is called. This will also account for > verifications that fail before the coordinator is called. > VerificationFailureRate – rate of verifications that returned in failure > either from the AddPartitionsToTxn response or through errors in the manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jeffkbkim commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245712559 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -141,7 +141,7 @@ public static class TimeAndEpoch { * after having refreshed the metadata but the write operation failed. In this case, the * time is not automatically rolled back. */ -private TimeAndEpoch nextMetadataRefreshTime = TimeAndEpoch.EMPTY; Review Comment: i noticed this across some places. can we rework the comments in `hasMetadataExpired()` and in the test `testNextMetadataRefreshTime()` along with the name to use deadline? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jeffkbkim commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1245682672 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -728,6 +794,81 @@ public void replay( } consumerGroup.removeMember(memberId); } + +updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); Review Comment: how does `groupsByTopics` (and `groups`) know that the changes made here are already committed (and won't be reverted)? i think i'm confused because in api handling (i.e. consumer group heartbeat) once we modify the timeline data structures we generate records to commit the offset in the timeline but here we do it in reverse ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -423,6 +456,47 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +/** + * Updates the metadata refresh deadline. + * + * @param deadlineMs The next time in milliseconds. + * @param groupEpoch The associated group epoch. + */ +public void setMetadataRefreshDeadline( +long deadlineMs, +int groupEpoch +) { +this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch); +} + +/** + * Requests a metadata refresh. + */ +public void requestMetadataRefresh() { +this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; +} + +/** + * Checks if a metadata refresh is required. A refresh is required in two cases: + * 1) The deadline is smaller or equals to the current time; Review Comment: nit: "or equal to" ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -423,6 +456,47 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +/** + * Updates the next metadata refresh time. + * + * @param nextTimeMs The next time in milliseconds. + * @param groupEpoch The associated group epoch. + */ +public void setNextMetadataRefreshTime( +long nextTimeMs, +int groupEpoch +) { +this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch); +} + +/** + * Resets the next metadata refresh. + */ +public void resetNextMetadataRefreshTime() { +this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY; +} + +/** + * Checks if a metadata refresh is required. A refresh is required in two cases: + * 1) The next update time is smaller or equals to the current time; + * 2) The group epoch associated with the next update time is smaller than Review Comment: shouldn't it be "associated with the next update time is larger than"? the "current group epoch" is `groupEpoch()` right ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -728,6 +794,81 @@ public void replay( } consumerGroup.removeMember(memberId); } + +updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); +} + +/** + * @return The set of groups subscribed to the topic. + */ +public Set groupsSubscribedToTopic(String topicName) { +Set groups = groupsByTopics.get(topicName); Review Comment: any reason we don't use `getOrDefault()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13845: KAFKA-15078; KRaft leader replys with snapshot for offset 0
jsancio merged PR #13845: URL: https://github.com/apache/kafka/pull/13845 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15078) When fetching offset 0 the KRaft leader should response with SnapshotId
[ https://issues.apache.org/jira/browse/KAFKA-15078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-15078. Fix Version/s: 3.6.0 Resolution: Fixed > When fetching offset 0 the KRaft leader should response with SnapshotId > --- > > Key: KAFKA-15078 > URL: https://issues.apache.org/jira/browse/KAFKA-15078 > Project: Kafka > Issue Type: Improvement >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0 > > > With the current implementation if the follower fetches offset 0 and the > KRaft leader has a record batch at offset 0, it will always send a FETCH > response with records. > If the KRaft log has generated a snapshot it is always more efficient of the > follower fetch the snapshot instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] pgjbz opened a new pull request, #13928: KAFKA-15097: prevent server shutdown when source file not exists
pgjbz opened a new pull request, #13928: URL: https://github.com/apache/kafka/pull/13928 catch NoSuchFileException on atomicMoveWithFallback this catch block will prevent throws NoSuchFileException and stop kafka server in Kraft Mode ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13658: KAFKA-14669: Use the generated docs for MirrorMaker configs in the doc
C0urante commented on code in PR #13658: URL: https://github.com/apache/kafka/pull/13658#discussion_r1245804801 ## docs/configuration.html: ## @@ -267,23 +267,31 @@ - 3.8 System Properties + + 3.8 MirrorMaker Configs + Below is the configuration of MirrorMaker. Review Comment: Nit: ```suggestion Below is the configuration of the connectors that make up MirrorMaker 2. ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java: ## @@ -57,7 +59,7 @@ short heartbeatsTopicReplicationFactor() { return getShort(HEARTBEATS_TOPIC_REPLICATION_FACTOR); } -protected static final ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(BASE_CONNECTOR_CONFIG_DEF) +protected static final ConfigDef HEARTBEAT_CONFIG_DEF = new ConfigDef() Review Comment: I think we can get away without introducing a utility method to merge `ConfigDefs` by replacing this constant (and others like it) with a method that adds the connector-specific configuration properties to an existing `ConfigDef`: ```java private static ConfigDef defineHeartbeatConfig(ConfigDef baseConfig) { return baseConfig .define( EMIT_HEARTBEATS_ENABLED, ConfigDef.Type.BOOLEAN, EMIT_HEARTBEATS_ENABLED_DEFAULT, ConfigDef.Importance.LOW, EMIT_HEARTBEATS_ENABLED_DOC) // ... ``` ## docs/configuration.html: ## @@ -267,23 +267,31 @@ - 3.8 System Properties + + 3.8 MirrorMaker Configs + Below is the configuration of MirrorMaker. + + Review Comment: We should note the name of the connector here. It'd also be nice if we gave each section (common, source, checkpoint, heartbeat) a subheading, like we do for [source](https://github.com/apache/kafka/blob/3a246b1abab0cfa8050546f54c987af2ec6cdd4e/docs/configuration.html#L254C121-L254C145) and [sink](https://github.com/apache/kafka/blob/3a246b1abab0cfa8050546f54c987af2ec6cdd4e/docs/configuration.html#L258) connectors, and perhaps a brief (one sentence is fine) description of what each connector does and/or a link to other parts of our docs that already provide that info. ## docs/configuration.html: ## @@ -267,23 +267,31 @@ - 3.8 System Properties + Review Comment: Nit: unnecessary extra line ```suggestion ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java: ## @@ -77,7 +79,9 @@ short heartbeatsTopicReplicationFactor() { ConfigDef.Importance.LOW, HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC); +protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(HEARTBEAT_CONFIG_DEF)); Review Comment: With the above suggestion, this can now become: ```java protected static final ConfigDef CONNECTOR_CONFIG_DEF = defineHeartbeatConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF)); ``` ## docs/configuration.html: ## @@ -267,23 +267,31 @@ - 3.8 System Properties + + 3.8 MirrorMaker Configs + Below is the configuration of MirrorMaker. + Review Comment: We should note that these are common properties that apply to all three connectors, right? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java: ## @@ -77,7 +79,9 @@ short heartbeatsTopicReplicationFactor() { ConfigDef.Importance.LOW, HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC); +protected final static ConfigDef CONNECTOR_CONFIG_DEF = new ConfigDef(mergeConnectorConfigDef(HEARTBEAT_CONFIG_DEF)); + public static void main(String[] args) { -System.out.println(CONNECTOR_CONFIG_DEF.toHtml(4, config -> "mirror_heartbeat_" + config)); +System.out.println(HEARTBEAT_CONFIG_DEF.toHtml(4, config -> "mirror_heartbeat_" + config)); Review Comment: With the above suggestion, this can now become: ```java System.out.println(defineHeartbeatConfig(new ConfigDef()).toHtml(4, config -> "mirror_heartbeat_" + config)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13446: KAFKA-14837, KAFKA-14842: Ignore groups that do not have offsets for filtered topics in MirrorCheckpointConnector
C0urante commented on code in PR #13446: URL: https://github.com/apache/kafka/pull/13446#discussion_r1245885579 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -150,10 +156,31 @@ private void loadInitialConsumerGroups() List findConsumerGroups() throws InterruptedException, ExecutionException { -return listConsumerGroups().stream() +List filteredGroups = listConsumerGroups().stream() .map(ConsumerGroupListing::groupId) -.filter(this::shouldReplicate) +.filter(this::shouldReplicateByGroupFilter) .collect(Collectors.toList()); + +List checkpointGroups = new LinkedList<>(); +List irrelevantGroups = new LinkedList<>(); + +for (String group : filteredGroups) { +Set consumedTopics = listConsumerGroupOffsets(group).keySet().stream() +.map(TopicPartition::topic) +.filter(this::shouldReplicateByTopicFilter) Review Comment: @blacktooth I'll mark this conversation resolved so that users don't get the wrong idea about MM2; feel free to unresolve if you believe there's still an issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stevenbooke commented on a diff in pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh
stevenbooke commented on code in PR #13842: URL: https://github.com/apache/kafka/pull/13842#discussion_r1245887810 ## refresh-collaborators.py: ## @@ -0,0 +1,44 @@ +import os +from bs4 import BeautifulSoup +from github import Github +import yaml + +### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ### +github_token = os.environ.get('GITHUB_TOKEN') +g = Github(github_token) +repo = g.get_repo("apache/kafka-site") +contents = repo.get_contents("committers.html") +content = contents.decoded_content +soup = BeautifulSoup(content, "html.parser") +committer_logins = [login.text for login in soup.find_all('div', class_='github_login')] + +### GET THE CONTRIBUTORS OF THE apache/kafka REPO ### +n = 10 +repo = g.get_repo("apache/kafka") +contributors = repo.get_contributors() Review Comment: Hello @vvcephei, I understand your concern. I will make changes to account for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic
junrao commented on code in PR #13898: URL: https://github.com/apache/kafka/pull/13898#discussion_r1245887623 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java: ## @@ -117,11 +104,6 @@ else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) * and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd(). */ public void resetPositionsIfNeeded() { -// Raise exception from previous offset fetch if there is one -RuntimeException exception = cachedListOffsetsException.getAndSet(null); Review Comment: his seems a change in existing logic and not just a refactoring. Is this change expected? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ## @@ -198,6 +208,11 @@ void validatePositionsOnMetadataChange() { } Map getOffsetResetTimestamp() { +// Raise exception from previous offset fetch if there is one +RuntimeException exception = cachedListOffsetsException.getAndSet(null); Review Comment: This seems a change in existing logic and not just a refactoring. Is this change expected? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ## @@ -261,6 +276,73 @@ void updateSubscriptionState(Map
[GitHub] [kafka] stevenbooke commented on a diff in pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh
stevenbooke commented on code in PR #13842: URL: https://github.com/apache/kafka/pull/13842#discussion_r1245890763 ## refresh-collaborators.py: ## @@ -0,0 +1,44 @@ +import os +from bs4 import BeautifulSoup +from github import Github +import yaml + +### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ### +github_token = os.environ.get('GITHUB_TOKEN') +g = Github(github_token) +repo = g.get_repo("apache/kafka-site") +contents = repo.get_contents("committers.html") +content = contents.decoded_content +soup = BeautifulSoup(content, "html.parser") +committer_logins = [login.text for login in soup.find_all('div', class_='github_login')] + +### GET THE CONTRIBUTORS OF THE apache/kafka REPO ### +n = 10 +repo = g.get_repo("apache/kafka") +contributors = repo.get_contributors() +collaborators = [] +for contributor in contributors: +if contributor.login not in committer_logins: +collaborators += [contributor.login] +refreshed_collaborators = collaborators[:n] + +### UPDATE asf.yaml ### +file_path = ".asf.yaml" +file = repo.get_contents(file_path) +yaml_content = yaml.safe_load(file.decoded_content) + +# Update 'github_whitelist' list +github_whitelist = refreshed_collaborators[:10] # New users to be added +yaml_content["jenkins"]["github_whitelist"] = github_whitelist + +# Update 'collaborators' list +collaborators = refreshed_collaborators[:10] # New collaborators to be added +yaml_content["github"]["collaborators"] = collaborators + +# Convert the updated content back to YAML +updated_yaml = yaml.safe_dump(yaml_content) + +# Commit and push the changes +commit_message = "Update .asf.yaml file with refreshed github_whitelist, and collaborators" Review Comment: Will do. ## refresh-collaborators.py: ## @@ -0,0 +1,44 @@ +import os +from bs4 import BeautifulSoup +from github import Github +import yaml + +### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ### +github_token = os.environ.get('GITHUB_TOKEN') +g = Github(github_token) +repo = g.get_repo("apache/kafka-site") +contents = repo.get_contents("committers.html") +content = contents.decoded_content +soup = BeautifulSoup(content, "html.parser") +committer_logins = [login.text for login in soup.find_all('div', class_='github_login')] + +### GET THE CONTRIBUTORS OF THE apache/kafka REPO ### +n = 10 +repo = g.get_repo("apache/kafka") +contributors = repo.get_contributors() +collaborators = [] +for contributor in contributors: +if contributor.login not in committer_logins: +collaborators += [contributor.login] +refreshed_collaborators = collaborators[:n] + +### UPDATE asf.yaml ### +file_path = ".asf.yaml" +file = repo.get_contents(file_path) +yaml_content = yaml.safe_load(file.decoded_content) + +# Update 'github_whitelist' list +github_whitelist = refreshed_collaborators[:10] # New users to be added Review Comment: Will fix that. ## refresh-collaborators.py: ## @@ -0,0 +1,44 @@ +import os Review Comment: Wil do. ## .github/workflows/refresh-collaborators.yaml: ## @@ -0,0 +1,24 @@ +name: Refresh asf.yaml collaborators every 3 months Review Comment: Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13917: MINOR; Failed move should be logged at WARN
jsancio commented on PR #13917: URL: https://github.com/apache/kafka/pull/13917#issuecomment-1612214700 > @jsancio Another difference is that now the `outer` exception's stack trace will be shown via `WARN` instead of just the exception's message via `DEBUG`. I assume that's intentional, but just wanted to call it out. Thanks! Yes @kirktrue, that was intentional. I would like to see the thread stack when the atomic move fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13917: MINOR; Failed move should be logged at WARN
jsancio merged PR #13917: URL: https://github.com/apache/kafka/pull/13917 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiaocairush commented on pull request #13928: KAFKA-15097: prevent server shutdown when source file not exists
xiaocairush commented on PR #13928: URL: https://github.com/apache/kafka/pull/13928#issuecomment-1612416149 Hi @pgjbz , could u kind please share which thread has deleted the log file? I'm a newbie for kafka source code and cannot find the root cause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation
dajac merged PR #13880: URL: https://github.com/apache/kafka/pull/13880 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request, #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown
hudeqi opened a new pull request, #13929: URL: https://github.com/apache/kafka/pull/13929 This pr is used to remove the metrics in `AbstractFetcherManager` when `ReplicaFetcherManager/ReplicaAlterLogDirsManager` instance shutdown. This pr has passed the corresponding unit test, and it is part of [KAFKA-15129](https://issues.apache.org/jira/browse/KAFKA-15129). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org