[GitHub] [kafka] JoeCqupt commented on pull request #11089: MINOR: remove unnecessary judgment in AdminUtils::assignReplicasToBrokersRackAware
JoeCqupt commented on pull request #11089: URL: https://github.com/apache/kafka/pull/11089#issuecomment-884671155 call for review @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest
ableegoldman edited a comment on pull request #11103: URL: https://github.com/apache/kafka/pull/11103#issuecomment-884541103 Kicked off system test run with 50 repeats -- https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4613/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11102: MINOR: Rename the @metadata topic to __cluster_metadata
cmccabe merged pull request #11102: URL: https://github.com/apache/kafka/pull/11102 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest
ableegoldman edited a comment on pull request #11103: URL: https://github.com/apache/kafka/pull/11103#issuecomment-884541103 Kicked off system test run with 15 repeats -- https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4612/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings
d8tltanc commented on pull request #11002: URL: https://github.com/apache/kafka/pull/11002#issuecomment-884570483 kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta These two tests seem to be related and fail in both runs. Will manually run them locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings
d8tltanc commented on pull request #11002: URL: https://github.com/apache/kafka/pull/11002#issuecomment-884568804 Test 4604 Failures: test_id: kafkatest.tests.core.zookeeper_authorizer_test.ZooKeeperAuthorizerTest.test_authorizer.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 1 minute 8.011 seconds -- -- test_id: kafkatest.tests.core.fetch_from_follower_test.FetchFromFollowerTest.test_consumer_preferred_read_replica.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 2 minutes 8.750 seconds -- -- test_id: kafkatest.tests.core.upgrade_test.TestUpgrade.test_upgrade.from_kafka_version=0.9.0.1.to_message_format_version=None.compression_types=.lz4 status: FAIL run time: 4 minutes 7.158 seconds -- -- test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery.processing_guarantee=exactly_once_beta status: FAIL run time: 9 minutes 56.437 seconds -- -- test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once status: FAIL run time: 10 minutes 8.148 seconds -- -- test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta status: FAIL run time: 10 minutes 8.626 seconds -- -- test_id: kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_rolling_bounces_will_not_trigger_rebalance_under_static_membership status: FAIL run time: 2 minutes 15.944 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=False.enable_autocommit=False.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 1 minute 47.734 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=True.enable_autocommit=False.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 1 minute 48.715 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_rolling_bounce.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 4 minutes 27.675 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_persisted_after_rejoin.bounce_mode=all.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 2 minutes 0.632 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_persisted_after_rejoin.bounce_mode=rolling.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 4 minutes 36.821 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.broker_type=leader.security_protocol=PLAINTEXT.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 4 minutes 59.358 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.2.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 3 minutes 10.650 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.broker_type=leader.security_protocol=SASL_SSL.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 5 minutes 46.542 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.3.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 3 minutes 11.345 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_type=leader.security_protocol=PLAINTEXT.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 6 minutes 13.538 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_type=leader.security_protocol=SASL_SSL.client_sasl_mechanism=PLAIN.interbroker_sasl_mechanism=GSSAPI.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 5 minutes 28.945 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_type=leader.security_protocol=SASL_SSL.client_sasl_mechanism=PLAIN.interbroker_sasl_mechanism=PLAIN.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 5 minutes 26.515 seconds -- --
[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings
d8tltanc commented on pull request #11002: URL: https://github.com/apache/kafka/pull/11002#issuecomment-884566723 System test run 4607 failures: test_id: kafkatest.tests.core.zookeeper_authorizer_test.ZooKeeperAuthorizerTest.test_authorizer.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 58.147 seconds -- -- test_id: kafkatest.tests.core.fetch_from_follower_test.FetchFromFollowerTest.test_consumer_preferred_read_replica.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 2 minutes 8.162 seconds -- -- test_id: kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=hard_bounce.bounce_target=clients.check_order=True.use_group_metadata=True status: FAIL run time: 2 minutes 16.481 seconds -- -- test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once status: FAIL run time: 10 minutes 12.060 seconds -- -- test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta status: FAIL run time: 10 minutes 6.530 seconds -- -- test_id: kafkatest.tests.streams.streams_smoke_test.StreamsSmokeTest.test_streams.processing_guarantee=at_least_once.crash=False.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 4 minutes 7.642 seconds -- -- test_id: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_metadata_upgrade.from_version=0.10.2.2.to_version=3.1.0-SNAPSHOT status: FAIL run time: 8 minutes 11.668 seconds -- -- test_id: kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_rolling_bounces_will_not_trigger_rebalance_under_static_membership status: FAIL run time: 2 minutes 15.825 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=False.enable_autocommit=True.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 1 minute 20.948 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 1 minute 15.040 seconds -- -- test_id: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_version_probing_upgrade status: FAIL run time: 2 minutes 11.466 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_rolling_bounce.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 4 minutes 25.921 seconds -- -- test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_persisted_after_rejoin.bounce_mode=rolling.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 3 minutes 34.687 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.broker_type=leader.security_protocol=PLAINTEXT.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 4 minutes 21.836 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.broker_type=leader.security_protocol=SASL_SSL.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 4 minutes 43.409 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.3.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 2 minutes 44.057 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.2.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 5 minutes 47.045 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_shutdown.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.2.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 2 minutes 0.945 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_type=leader.security_protocol=PLAINTEXT.metadata_quorum=REMOTE_KRAFT status: FAIL run time: 6 minutes 43.349 seconds -- -- test_id: kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_
[GitHub] [kafka] ableegoldman commented on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest
ableegoldman commented on pull request #11103: URL: https://github.com/apache/kafka/pull/11103#issuecomment-884541103 Kicked off system test run with 30 repeats -- https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4611/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs
jolshan commented on a change in pull request #11104: URL: https://github.com/apache/kafka/pull/11104#discussion_r674387908 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -332,6 +329,9 @@ public FetchRequestData build() { iter.remove(); // Indicate that we no longer want to listen to this partition. removed.add(topicPartition); +// If we do not have this topic ID in the session, we can not use topic IDs Review comment: One alternative to this approach is to only do this check when no partitions were added to the builder, but I'm not sure this added complexity is worth it. The method here is also safer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs
jolshan opened a new pull request #11104: URL: https://github.com/apache/kafka/pull/11104 The FetchSessionHandler had a small bug in the session build method where we did not consider building a session where no partitions were added and the session previously did not use topic IDs. (ie, it was relying on at least one partition being added to signify whether topic IDs were present) Due to this, we could send forgotten partitions with the zero UUID. This would always result in an exception and closed session. This PR fixes the logic to check that any forgotten partitions have topic IDs. There is also a test added for the empty session situation when topic IDs are used and when topic names are used. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest
ableegoldman opened a new pull request #11103: URL: https://github.com/apache/kafka/pull/11103 This test is hitting pretty frequent timeouts after bouncing a node and waiting for it to come back and fully rejoin the group. It seems to now take 45s for the initial JoinGroup to succeed, which I suspect is due to the new default session.interval.ms (which was recently changed to 45s). Let's try fixing this config to the old value of 10s and see if that helps it rejoin in time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11101: MINOR: Remove redundant fields in dump log record output
ijuma commented on pull request #11101: URL: https://github.com/apache/kafka/pull/11101#issuecomment-884537914 Can we add a test to DumpLogSegmentsTest? I fear we will regress again in the future, otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots
niket-goel commented on pull request #11053: URL: https://github.com/apache/kafka/pull/11053#issuecomment-884534694 Rebased the branch with Trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel opened a new pull request #11102: Renaming the internal metadata topic to "__cluster_metadata"
niket-goel opened a new pull request #11102: URL: https://github.com/apache/kafka/pull/11102 This PR renames the internal metadata topic to "__cluster_metadata". The topic was previously known as "@metadata" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #11101: MINOR: Remove redundant fields in dump log record output
mumrah commented on a change in pull request #11101: URL: https://github.com/apache/kafka/pull/11101#discussion_r674342185 ## File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala ## @@ -268,13 +268,10 @@ object DumpLogSegments { } lastOffset = record.offset -var prefix = s"${RecordIndent} " +var prefix = s"$RecordIndent " if (!skipRecordMetadata) { - print(s"${prefix}offset: ${record.offset}" + - s" keySize: ${record.keySize} valueSize: ${record.valueSize} ${batch.timestampType}: ${record.timestamp}" + - s" baseOffset: ${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${batch.baseSequence}" + - s" lastSequence: ${batch.lastSequence} producerEpoch: ${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" + - s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic} compressType: ${batch.compressionType} position: ${validBytes}") + print(s"${prefix}offset: ${record.offset} ${batch.timestampType}: ${record.timestamp} " + +s"keysize: ${record.keySize} valuesize: ${record.valueSize}") Review comment: nit: keySize and valueSize? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread
ableegoldman commented on pull request #10921: URL: https://github.com/apache/kafka/pull/10921#issuecomment-884482756 Merged to trunk and cherrypicked to 2.8 and 3.0 (cc @kkonstantine) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13121) Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
[ https://issues.apache.org/jira/browse/KAFKA-13121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385093#comment-17385093 ] A. Sophie Blee-Goldman commented on KAFKA-13121: Hey [~jolshan], thought you might have some context on this since it (sort of) had to do with topic ids. The full logs aren't much but I'll upload them in case that helps: [^TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates.rtf] > Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates() > --- > > Key: KAFKA-13121 > URL: https://issues.apache.org/jira/browse/KAFKA-13121 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: A. Sophie Blee-Goldman >Priority: Major > Attachments: > TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates.rtf > > > h4. Stack Trace > {code:java} > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No resource found for partition: > TopicIdPartition{topicId=2B9rDu44TE6c8pLG8A0RAg, topicPartition=new-leader-0} > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:112) > > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.listRemoteLogSegments(RemotePartitionMetadataStore.java:98) > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.listRemoteLogSegments(TopicBasedRemoteLogMetadataManager.java:212) > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates(TopicBasedRemoteLogMetadataManagerTest.java:99){code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10921/11/testReport/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13121) Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
[ https://issues.apache.org/jira/browse/KAFKA-13121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13121: --- Attachment: TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates.rtf > Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates() > --- > > Key: KAFKA-13121 > URL: https://issues.apache.org/jira/browse/KAFKA-13121 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: A. Sophie Blee-Goldman >Priority: Major > Attachments: > TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates.rtf > > > h4. Stack Trace > {code:java} > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No resource found for partition: > TopicIdPartition{topicId=2B9rDu44TE6c8pLG8A0RAg, topicPartition=new-leader-0} > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:112) > > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.listRemoteLogSegments(RemotePartitionMetadataStore.java:98) > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.listRemoteLogSegments(TopicBasedRemoteLogMetadataManager.java:212) > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates(TopicBasedRemoteLogMetadataManagerTest.java:99){code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10921/11/testReport/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13121) Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
A. Sophie Blee-Goldman created KAFKA-13121: -- Summary: Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates() Key: KAFKA-13121 URL: https://issues.apache.org/jira/browse/KAFKA-13121 Project: Kafka Issue Type: Bug Components: log Reporter: A. Sophie Blee-Goldman h4. Stack Trace {code:java} org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No resource found for partition: TopicIdPartition{topicId=2B9rDu44TE6c8pLG8A0RAg, topicPartition=new-leader-0} at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:112) at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.listRemoteLogSegments(RemotePartitionMetadataStore.java:98) at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.listRemoteLogSegments(TopicBasedRemoteLogMetadataManager.java:212) at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates(TopicBasedRemoteLogMetadataManagerTest.java:99){code} https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10921/11/testReport/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust
[ https://issues.apache.org/jira/browse/KAFKA-13120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leah Thomas updated KAFKA-13120: Description: When fixing the `streams_static_membership_test.py` we noticed that the test is pretty bare bones, it creates a streams application but doesn't do much with the streams application, eg has no stateful processing. We should flesh this out a bit to be more realistic and potentially consider testing with EOS as well. The full java test is in `StaticMembershipTestClient` (was: When fixing the `streams_static_membership_test.py` we noticed that the test is pretty bare bones, it creates a streams application but doesn't ever send data through it or do much with the streams application. We should flesh this out a bit to be more realistic. The full java test is in `StaticMembershipTestClient`) > Flesh out `streams_static_membership_test` to be more robust > > > Key: KAFKA-13120 > URL: https://issues.apache.org/jira/browse/KAFKA-13120 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Leah Thomas >Priority: Minor > > When fixing the `streams_static_membership_test.py` we noticed that the test > is pretty bare bones, it creates a streams application but doesn't do much > with the streams application, eg has no stateful processing. We should flesh > this out a bit to be more realistic and potentially consider testing with EOS > as well. The full java test is in `StaticMembershipTestClient` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread
ableegoldman merged pull request #10921: URL: https://github.com/apache/kafka/pull/10921 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for StreamsMe…
ableegoldman commented on a change in pull request #10881: URL: https://github.com/apache/kafka/pull/10881#discussion_r674307316 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java ## @@ -23,43 +23,32 @@ import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.easymock.EasyMock; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.Properties; -@RunWith(PowerMockRunner.class) -@PrepareForTest({InternalTopologyBuilder.class}) +import static org.mockito.Mockito.mock; + public class TableSourceNodeTest { private static final String STORE_NAME = "store-name"; private static final String TOPIC = "input-topic"; -private final InternalTopologyBuilder topologyBuilder = PowerMock.createNiceMock(InternalTopologyBuilder.class); +private final InternalTopologyBuilder topologyBuilder = mock(InternalTopologyBuilder.class); @Test public void shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() { final boolean shouldReuseSourceTopicForChangelog = true; topologyBuilder.connectSourceStoreAndTopic(STORE_NAME, TOPIC); -EasyMock.replay(topologyBuilder); buildTableSourceNode(shouldReuseSourceTopicForChangelog); - -EasyMock.verify(topologyBuilder); Review comment: Admittedly I don't know much about Mockito, but it looks like we're removing the one verification step that's actually testing something in this test without replacing it. Isn't there a `verify()` for Mockito? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #11093: MINOR: add serde configs to properly set serdes in failing StreamsStaticMembershipTest
ableegoldman edited a comment on pull request #11093: URL: https://github.com/apache/kafka/pull/11093#issuecomment-884457416 Merged to trunk and cherrypicked to 3.0 (cc @kkonstantine -- it's a system test fix) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for StreamsMe…
ableegoldman commented on pull request #10881: URL: https://github.com/apache/kafka/pull/10881#issuecomment-884466044 cc also @cadonna who's familiar with these specific tests I think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11093: MINOR: add serde configs to properly set serdes in failing StreamsStaticMembershipTest
ableegoldman commented on pull request #11093: URL: https://github.com/apache/kafka/pull/11093#issuecomment-884457416 Merged to trunk and cherrypicked to 3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11093: MINOR: add serde configs to properly set serdes in failing StreamsStaticMembershipTest
ableegoldman merged pull request #11093: URL: https://github.com/apache/kafka/pull/11093 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11093: FIX: add serde configs to properly set serde
ableegoldman commented on pull request #11093: URL: https://github.com/apache/kafka/pull/11093#issuecomment-884455762 Just some unrelated/known flaky test failures: `ConsumerBounceTest.testCloseDuringRebalance()` and `TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation` (the latter was recently fixed and the error message in the failure does not match the new code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API
jeqo commented on a change in pull request #11099: URL: https://github.com/apache/kafka/pull/11099#discussion_r674249547 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ## @@ -98,7 +98,7 @@ public String newStoreName(final String prefix) { null, GLOBAL_STORE_TOPIC_NAME, "processorName", -() -> ProcessorAdapter.adapt(new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get())); +() -> new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get()); Review comment: Oh! great catch! You're right, I've rolled back the change and test is working. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API
jeqo commented on a change in pull request #11099: URL: https://github.com/apache/kafka/pull/11099#discussion_r674249144 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java ## @@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final V value) { +public void process(final Record record) { // if the key is null, then ignore the record -if (key == null) { -LOG.warn( +if (record.key() == null) { +context.recordMetadata().ifPresent(recordMetadata -> LOG.warn( Review comment: Agreed. I added some more context to the messages and make them consistent across the class. Let me know it looks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] iakunin edited a comment on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code
iakunin edited a comment on pull request #10775: URL: https://github.com/apache/kafka/pull/10775#issuecomment-884402579 @jsancio got it. Thanks for explaining me that. I'll be waiting patiently :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] iakunin commented on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code
iakunin commented on pull request #10775: URL: https://github.com/apache/kafka/pull/10775#issuecomment-884402579 @jsancio got it. Thanks for explaining me that. I will wait patiently :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #11101: MINOR: Remove redundant fields in dump log record output
hachikuji opened a new pull request #11101: URL: https://github.com/apache/kafka/pull/11101 In 2.8, the dump log output regressed to print batch level information for each record, which makes the output considerably noisier. This patch changes the output to what it was in 2.7 and previous versions. We only look batch metadata at the batch level. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust
Leah Thomas created KAFKA-13120: --- Summary: Flesh out `streams_static_membership_test` to be more robust Key: KAFKA-13120 URL: https://issues.apache.org/jira/browse/KAFKA-13120 Project: Kafka Issue Type: Task Components: streams, system tests Reporter: Leah Thomas When fixing the `streams_static_membership_test.py` we noticed that the test is pretty bare bones, it creates a streams application but doesn't ever send data through it or do much with the streams application. We should flesh this out a bit to be more realistic. The full java test is in `StaticMembershipTestClient` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots
cmccabe commented on a change in pull request #11053: URL: https://github.com/apache/kafka/pull/11053#discussion_r674207120 ## File path: tests/kafkatest/tests/core/snapshot_test.py ## @@ -0,0 +1,227 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import parametrize, matrix +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until + +from kafkatest.services.kafka import quorum +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import config_property +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.utils import is_int +import random + +class TestSnapshots(ProduceConsumeValidateTest): + +METADATA_LOG_RETENTION_BYTES = "4096" +METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096" +METADATA_LOG_SEGMENT_BYTES = "900" +METADATA_LOG_SEGMENT_MS = "1" + +def __init__(self, test_context): +super(TestSnapshots, self).__init__(test_context=test_context) +self.topics_created = 0 +self.topic = "test_topic" +self.partitions = 3 +self.replication_factor = 3 +self.num_nodes = 3 + +# Producer and consumer +self.producer_throughput = 1000 +self.num_producers = 1 +self.num_consumers = 1 + +security_protocol = 'PLAINTEXT' +# Setup Custom Config to ensure snapshot will be generated deterministically +self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None, + topics={self.topic: {"partitions": self.partitions, + "replication-factor": self.replication_factor, + 'configs': {"min.insync.replicas": 2}}}, + server_prop_overrides=[ + [config_property.METADATA_LOG_DIR, KafkaService.METADATA_LOG_DIR], + [config_property.METADATA_LOG_SEGMENT_MS, TestSnapshots.METADATA_LOG_SEGMENT_MS], + [config_property.METADATA_LOG_RETENTION_BYTES, TestSnapshots.METADATA_LOG_RETENTION_BYTES], + [config_property.METADATA_LOG_SEGMENT_BYTES, TestSnapshots.METADATA_LOG_SEGMENT_BYTES], + [config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS] + ]) + +self.kafka.interbroker_security_protocol = security_protocol +self.kafka.security_protocol = security_protocol + +def setUp(self): +# Start the cluster and ensure that a snapshot is generated +self.logger.info("Starting the cluster and running until snapshot creation") + +assert quorum.for_test(self.test_context) in quorum.all_kraft, \ +"Snapshot test should be run Kraft Modes only" + +self.kafka.start() + +topic_count = 10 +self.topics_created += self.create_n_topics(topic_count) + +if self.kafka.remote_controller_quorum: +controller_nodes = self.kafka.remote_controller_quorum.nodes +else: +controller_nodes = self.kafka.nodes[:self.kafka.num_nodes_controller_role] + +# Waiting for snapshot creation and first log segment +# cleanup on all controller nodes +for node in controller_nodes: +self.logger.debug("Waiting for snapshot on: %s" % self.kafka.who_am_i(node)) +self.wait_for_log_segment_delete(node) +self.wait_for_snapshot(node) +self.logger.debug("Verified Snapshots exist on controller nodes") + +def create_n_topics(self, topic_count): +for i in range(self.topics_created, topic_count): +topic = "test_topic_%d" % i +print("Creating topic %s" % topic, flush=True) +topic_cfg = { +"topic": topic, +"partitions": self.partitions, +
[GitHub] [kafka] cmccabe commented on a change in pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots
cmccabe commented on a change in pull request #11053: URL: https://github.com/apache/kafka/pull/11053#discussion_r674205413 ## File path: tests/kafkatest/tests/core/snapshot_test.py ## @@ -0,0 +1,224 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import parametrize, matrix +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until + +from kafkatest.services.kafka import quorum +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import config_property +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.utils import is_int +import random +import time + +class TestSnapshots(ProduceConsumeValidateTest): + +METADATA_LOG_RETENTION_BYTES = "2048" +METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "2048" +METADATA_LOG_SEGMENT_BYTES = "900" +METADATA_LOG_SEGMENT_MS = "1" +TOPIC_NAME_PREFIX = "test_topic_" + +def __init__(self, test_context): +super(TestSnapshots, self).__init__(test_context=test_context) +self.topics_created = 0 +self.topic = "test_topic" +self.partitions = 3 +self.replication_factor = 3 +self.num_nodes = 3 + +# Producer and consumer +self.producer_throughput = 1000 +self.num_producers = 1 +self.num_consumers = 1 + +security_protocol = 'PLAINTEXT' +# Setup Custom Config to ensure snapshot will be generated deterministically +self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None, + topics={self.topic: {"partitions": self.partitions, + "replication-factor": self.replication_factor, + 'configs': {"min.insync.replicas": 2}}}, + server_prop_overrides=[ + [config_property.METADATA_LOG_DIR, KafkaService.METADATA_LOG_DIR], + [config_property.METADATA_LOG_SEGMENT_MS, TestSnapshots.METADATA_LOG_SEGMENT_MS], + [config_property.METADATA_LOG_RETENTION_BYTES, TestSnapshots.METADATA_LOG_RETENTION_BYTES], + [config_property.METADATA_LOG_SEGMENT_BYTES, TestSnapshots.METADATA_LOG_SEGMENT_BYTES], + [config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS] + ]) + +self.kafka.interbroker_security_protocol = security_protocol +self.kafka.security_protocol = security_protocol + +def setUp(self): +# Start the cluster and ensure that a snapshot is generated +self.logger.info("Starting the cluster and running until snapshot creation") + +assert quorum.for_test(self.test_context) in quorum.all_kraft, \ +"Snapshot test should be run Kraft Modes only" + +self.kafka.start() + +topic_count = 10 +self.topics_created += self.create_n_topics(topic_count) + +if self.kafka.remote_controller_quorum: +self.controller_nodes = self.kafka.remote_controller_quorum.nodes +else: +self.controller_nodes = self.kafka.nodes[:self.kafka.num_nodes_controller_role] + +# Waiting for snapshot creation and first log segment +# cleanup on all controller nodes +for node in self.controller_nodes: +self.logger.debug("Waiting for snapshot on: %s" % self.kafka.who_am_i(node)) +self.wait_for_log_segment_delete(node) +self.wait_for_snapshot(node) +self.logger.debug("Verified Snapshots exist on controller nodes") + +def create_n_topics(self, topic_count): +for i in range(self.topics_created, topic_count): +topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, i) +self.logger.debug("Creating topic %s" % topic) +topic_cfg = { +
[GitHub] [kafka] cmccabe commented on a change in pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots
cmccabe commented on a change in pull request #11053: URL: https://github.com/apache/kafka/pull/11053#discussion_r674203880 ## File path: tests/kafkatest/tests/core/snapshot_test.py ## @@ -0,0 +1,224 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import parametrize, matrix +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until + +from kafkatest.services.kafka import quorum +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import config_property +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.utils import is_int +import random +import time + +class TestSnapshots(ProduceConsumeValidateTest): + +METADATA_LOG_RETENTION_BYTES = "2048" Review comment: I think you can just put the constants later on in the `server_prop_overrides` block. There's no reason to have a special name for them since they're only used in one place (which is itself a constant) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots
cmccabe commented on a change in pull request #11053: URL: https://github.com/apache/kafka/pull/11053#discussion_r674202902 ## File path: tests/kafkatest/services/kafka/config.py ## @@ -24,6 +24,11 @@ class KafkaConfig(dict): DEFAULTS = { config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536, config_property.LOG_DIRS: "/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2", +config_property.METADATA_LOG_DIR: "/mnt/kafka/kafka-metadata-logs", +config_property.METADATA_LOG_SEGMENT_BYTES: 8388608, +config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: 100, Review comment: Hmm, I'm a bit confused about why we'd make something like "100 bytes between snapshots" the default. I guess the intention is to do a stress test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13119) Validate the KRaft controllerListener config on startup
Colin McCabe created KAFKA-13119: Summary: Validate the KRaft controllerListener config on startup Key: KAFKA-13119 URL: https://issues.apache.org/jira/browse/KAFKA-13119 Project: Kafka Issue Type: Bug Reporter: Colin McCabe Assignee: Niket Goel Fix For: 3.0.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13119) Validate the KRaft controllerListener config on startup
[ https://issues.apache.org/jira/browse/KAFKA-13119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-13119. -- Resolution: Fixed > Validate the KRaft controllerListener config on startup > --- > > Key: KAFKA-13119 > URL: https://issues.apache.org/jira/browse/KAFKA-13119 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #11070: Validate the controllerListener config on startup
cmccabe merged pull request #11070: URL: https://github.com/apache/kafka/pull/11070 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #10695: KAFKA-12783: Remove the deprecated ZK-based partition reassignment API
cmccabe closed pull request #10695: URL: https://github.com/apache/kafka/pull/10695 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10695: KAFKA-12783: Remove the deprecated ZK-based partition reassignment API
cmccabe commented on pull request #10695: URL: https://github.com/apache/kafka/pull/10695#issuecomment-884364747 I guess we're going with the approach from #10471 for 3.0, so I'll close this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13118) Backport KAFKA-9887 to 3.0 branch after 3.0.0 release
[ https://issues.apache.org/jira/browse/KAFKA-13118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-13118: -- Description: We need to backport the fix (commit hash `0314801a8e`) for KAFKA-9887 to the `3.0` branch. That fix was merged to `trunk`, `2.8`, and `2.7` _after_ the 3.0 code freeze, and that issue is not a blocker or regression. Be sure to update the "fix version" on KAFKA-9887 when the backport is complete. was:We need to backport the fix (commit hash `0314801a8e`) for KAFKA-9887 to the `3.0` branch. That fix was merged to `trunk`, `2.8`, and `2.7` _after_ the 3.0 code freeze, and that issue is not a blocker or regression. > Backport KAFKA-9887 to 3.0 branch after 3.0.0 release > - > > Key: KAFKA-13118 > URL: https://issues.apache.org/jira/browse/KAFKA-13118 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 3.0.1 > > > We need to backport the fix (commit hash `0314801a8e`) for KAFKA-9887 to the > `3.0` branch. That fix was merged to `trunk`, `2.8`, and `2.7` _after_ the > 3.0 code freeze, and that issue is not a blocker or regression. > Be sure to update the "fix version" on KAFKA-9887 when the backport is > complete. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13118) Backport KAFKA-9887 to 3.0 branch after 3.0.0 release
Randall Hauch created KAFKA-13118: - Summary: Backport KAFKA-9887 to 3.0 branch after 3.0.0 release Key: KAFKA-13118 URL: https://issues.apache.org/jira/browse/KAFKA-13118 Project: Kafka Issue Type: Task Components: KafkaConnect Affects Versions: 3.0.1 Reporter: Randall Hauch Assignee: Randall Hauch Fix For: 3.0.1 We need to backport the fix (commit hash `0314801a8e`) for KAFKA-9887 to the `3.0` branch. That fix was merged to `trunk`, `2.8`, and `2.7` _after_ the 3.0 code freeze, and that issue is not a blocker or regression. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9887) failed-task-count JMX metric not updated if task fails during startup
[ https://issues.apache.org/jira/browse/KAFKA-9887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385005#comment-17385005 ] Randall Hauch commented on KAFKA-9887: -- Unfortunately, the 3.0 code freeze was two weeks ago, so I will create a new issue as a blocker for 3.0.1 to backport this fix to the `3.0` branch after the 3.0.0 release is complete, and will like to this issue. > failed-task-count JMX metric not updated if task fails during startup > - > > Key: KAFKA-9887 > URL: https://issues.apache.org/jira/browse/KAFKA-9887 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: Chris Egerton >Assignee: Michael Carter >Priority: Major > Fix For: 3.1.0, 2.7.2, 2.8.1 > > > If a task fails on startup (specifically, during [this code > section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L427-L468]), > the {{failed-task-count}} JMX metric is not updated to reflect the task > failure, even though the status endpoints in the REST API do report the task > as failed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
[ https://issues.apache.org/jira/browse/KAFKA-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-12851: -- Assignee: Jose Armando Garcia Sancio > Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable > --- > > Key: KAFKA-12851 > URL: https://issues.apache.org/jira/browse/KAFKA-12851 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Fix For: 3.0.0 > > Attachments: Capture.PNG > > > Failed twice on a [PR > build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/] > h3. Stacktrace > org.opentest4j.AssertionFailedError: expected: but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at > org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13104) Controller should notify the RaftClient when it resigns
[ https://issues.apache.org/jira/browse/KAFKA-13104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-13104. -- Resolution: Fixed Fixed > Controller should notify the RaftClient when it resigns > --- > > Key: KAFKA-13104 > URL: https://issues.apache.org/jira/browse/KAFKA-13104 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Ryan Dielhenn >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > {code:java} > private Throwable handleEventException(String name, > Optional > startProcessingTimeNs, > Throwable exception) { > ... > renounce(); > return new UnknownServerException(exception); > } > {code} > When the active controller encounters an event exception it attempts to > renounce leadership. Unfortunately, this doesn't tell the {{RaftClient}} that > it should attempt to give up leadership. This will result in inconsistent > state with the {{RaftClient}} as leader but with the controller as inactive. > We should change this implementation so that the active controller asks the > {{RaftClient}} to resign. The active controller waits until > {{handleLeaderChange}} before calling {{renounce()}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API
vvcephei commented on a change in pull request #11099: URL: https://github.com/apache/kafka/pull/11099#discussion_r674135420 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -558,14 +550,14 @@ public final void addStateStore(final StoreBuilder storeBuilder, nodeGroups = null; } -public final void addGlobalStore(final StoreBuilder storeBuilder, +public final void addGlobalStore(final StoreBuilder storeBuilder, Review comment: Huh, I was surprised to see this change, but I figured out it's because of the below test. That test seems to be abusing the source node processor, so let's roll back this change and fix that test. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java ## @@ -120,19 +123,19 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final Change change) { -final V1 newValue = computeValue(key, change.newValue); -final V1 oldValue = computeOldValue(key, change); +public void process(final Record> record) { +final VOut newValue = computeValue(record.key(), record.value().newValue); +final VOut oldValue = computeOldValue(record.key(), record.value()); if (queryableName != null) { -store.put(key, ValueAndTimestamp.make(newValue, context().timestamp())); -tupleForwarder.maybeForward(key, newValue, oldValue); +store.put(record.key(), ValueAndTimestamp.make(newValue, record.timestamp())); +tupleForwarder.maybeForward(record.key(), newValue, oldValue); Review comment: During review, it struck me that we're just internally going to turn this back into a Record. We can update the internal API to accept the new forwarding type, but we don't need to tack that on to this PR. I filed https://issues.apache.org/jira/browse/KAFKA-13117 so we don't forget. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java ## @@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final V value) { +public void process(final Record record) { // if the key is null, then ignore the record -if (key == null) { -LOG.warn( +if (record.key() == null) { +context.recordMetadata().ifPresent(recordMetadata -> LOG.warn( "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", -context().topic(), context().partition(), context().offset() -); +recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() +)); droppedRecordsSensor.record(); return; } if (queryableName != null) { -final ValueAndTimestamp oldValueAndTimestamp = store.get(key); -final V oldValue; +final ValueAndTimestamp oldValueAndTimestamp = store.get(record.key()); +final VIn oldValue; if (oldValueAndTimestamp != null) { oldValue = oldValueAndTimestamp.value(); -if (context().timestamp() < oldValueAndTimestamp.timestamp()) { -LOG.warn("Detected out-of-order KTable update for {} at offset {}, partition {}.", -store.name(), context().offset(), context().partition()); +if (record.timestamp() < oldValueAndTimestamp.timestamp()) { +context.recordMetadata().ifPresent(recordMetadata -> +LOG.warn( +"Detected out-of-order KTable update for {} at offset {}, partition {}.", Review comment: Also, this is not related to your change, but it occurs to me that we're missing some important information here, which would likely save people a lot of time. Can we add the new and old timestamps to the log messages? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java ## @@ -92,33 +97,36 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final V value) { +public void process(final Record record) { // if the key is null, then ignore the record -if (key == null) { -LOG.warn( +if (record.key() == null) { +context.recordMetadata().ifPresent(recordMetadata -> LOG.warn( Review comment:
[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
hachikuji commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r674140433 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { -scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { -val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => -entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false -} -}.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs -}.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata.inLock { -txnMetadata.prepareDead() + private def collectExpiredTransactionalIds( +partitionId: Int, +partitionCacheEntry: TxnMetadataCacheEntry + ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = { +val currentTimeMs = time.milliseconds() + +inReadLock(stateLock) { + val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) + replicaManager.getLogConfig(transactionPartition) match { +case Some(logConfig) => + val maxBatchSize = logConfig.maxMessageSize + val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata] + + lazy val recordsBuilder = MemoryRecords.builder( +ByteBuffer.allocate(math.min(16384, maxBatchSize)), +TransactionLog.EnforcedCompressionType, +TimestampType.CREATE_TIME, +0L, +maxBatchSize + ) + + partitionCacheEntry.metadataPerTransactionalId.foreachWhile { (transactionalId, txnMetadata) => +txnMetadata.inLock { + if (!shouldExpire(txnMetadata, currentTimeMs)) { +true + } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, currentTimeMs, maxBatchSize)) { +val transitMetadata = txnMetadata.prepareDead() +expired += TransactionalIdCoordinatorEpochAndMetadata( + transactionalId, + partitionCacheEntry.coordinatorEpoch, + transitMetadata +) +true + } else { +// If the batch is full, return false to end the search. Any remaining +// transactionalIds eligible for expiration can be picked next time. +false } - TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition) } - }.groupBy { transactionalIdCoordinatorEpochAndMetadata => - partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId) } -val recordsPerPartition = transactionalIdByPartition - .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) => -val deletes: Array[SimpleRecord] = transactionalIdCoordinatorEpochAndMetadatas.map { entry => - new SimpleRecord(now, TransactionLog.keyToBytes(entry.transactionalId), null) -}.toArray -val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*) -val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition) -(topicPartition, records) + if (expired.isEmpty) { +(Seq.empty, MemoryRecords.EMPTY) + } else { +(expired, recordsBuilder.build()) } -def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = { - responses.forKeyValue { (topicPartition, response) => -inReadLock(stateLock) { - val toRemove = transactionalIdByPartition(topicPartition.partition) - transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry => -toRemove.foreach { idCoordinatorEpochAndMetadata => - val transactionalId = idCoordinatorEpochAndMetadata.transactionalId - val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId) - txnMetadata.inLock { -if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch - && txnMetadata.pendingState.contains(Dead) -
[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
hachikuji commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r674117013 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { -scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { -val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => -entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false -} -}.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs -}.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata.inLock { -txnMetadata.prepareDead() + private def collectExpiredTransactionalIds( +partitionId: Int, +partitionCacheEntry: TxnMetadataCacheEntry + ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = { +val currentTimeMs = time.milliseconds() + +inReadLock(stateLock) { + val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) + replicaManager.getLogConfig(transactionPartition) match { +case Some(logConfig) => + val maxBatchSize = logConfig.maxMessageSize + val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata] + + lazy val recordsBuilder = MemoryRecords.builder( +ByteBuffer.allocate(math.min(16384, maxBatchSize)), Review comment: Filling a batch is an uncommon scenario, so I thought that allocating a full 1MB (default max.message.bytes) buffer each time the task ran seemed excessive. 16K seemed more reasonable for the common case. Another thought that I considered was a statically allocated buffer, but that seemed like overkill. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #11061: MINOR: Add test for verifying retention on changelog topics
cadonna merged pull request #11061: URL: https://github.com/apache/kafka/pull/11061 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11061: MINOR: Add test for verifying retention on changelog topics
cadonna commented on pull request #11061: URL: https://github.com/apache/kafka/pull/11061#issuecomment-884279103 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11093: FIX: add serde configs to properly set serde
cadonna commented on pull request #11093: URL: https://github.com/apache/kafka/pull/11093#issuecomment-884274934 Build ran into a timeout. I retriggered the build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code
jsancio commented on pull request #10775: URL: https://github.com/apache/kafka/pull/10775#issuecomment-884274297 @iakunin excuse for the delays reviewing this but a few of the committers and contributors have been busy getting 3.0 ready for release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8529) Flakey test ConsumerBounceTest#testCloseDuringRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384947#comment-17384947 ] Bruno Cadonna commented on KAFKA-8529: -- Failed again: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11093/4/testReport/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testCloseDuringRebalance___2/ {code:java} org.opentest4j.AssertionFailedError: Rebalance did not complete in time ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:400) at kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:414) at kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:381) {code} > Flakey test ConsumerBounceTest#testCloseDuringRebalance > --- > > Key: KAFKA-8529 > URL: https://issues.apache.org/jira/browse/KAFKA-8529 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5450/consoleFull] > > *16:16:10* kafka.api.ConsumerBounceTest > testCloseDuringRebalance > STARTED*16:16:22* kafka.api.ConsumerBounceTest.testCloseDuringRebalance > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testCloseDuringRebalance.test.stdout*16:16:22* > *16:16:22* kafka.api.ConsumerBounceTest > testCloseDuringRebalance > FAILED*16:16:22* java.lang.AssertionError: Rebalance did not complete in > time*16:16:22* at org.junit.Assert.fail(Assert.java:89)*16:16:22* > at org.junit.Assert.assertTrue(Assert.java:42)*16:16:22* at > kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:402)*16:16:22* > at > kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:416)*16:16:22* > at > kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:379) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13117) After processors, migrate TupleForwarder and CacheFlushListener
John Roesler created KAFKA-13117: Summary: After processors, migrate TupleForwarder and CacheFlushListener Key: KAFKA-13117 URL: https://issues.apache.org/jira/browse/KAFKA-13117 Project: Kafka Issue Type: Sub-task Reporter: John Roesler Currently, both of these interfaces take plain values in combination with timestamps: CacheFlushListener: {code:java} void apply(K key, V newValue, V oldValue, long timestamp) {code} TimestampedTupleForwarder {code:java} void maybeForward(K key, V newValue, V oldValue, long timestamp){code} These are internally translated to the new PAPI, but after the processors are migrated, there won't be a need to have this translation. We should update both of these APIs to just accept {{Record>}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10546) KIP-478: Deprecate the old PAPI interfaces
[ https://issues.apache.org/jira/browse/KAFKA-10546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10546. -- Resolution: Fixed > KIP-478: Deprecate the old PAPI interfaces > -- > > Key: KAFKA-10546 > URL: https://issues.apache.org/jira/browse/KAFKA-10546 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Can't be done until after the DSL internals are migrated. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lct45 commented on pull request #11093: FIX: add serde configs to properly set serde
lct45 commented on pull request #11093: URL: https://github.com/apache/kafka/pull/11093#issuecomment-884232748 @cadonna @ableegoldman is one of you able to merge? And potentially cherry-pick to the 3.0 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13116) KIP-724: Adjust system tests due to KAFKA-12944
[ https://issues.apache.org/jira/browse/KAFKA-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13116: Summary: KIP-724: Adjust system tests due to KAFKA-12944 (was: Adjust system tests due to KAFKA-12944) > KIP-724: Adjust system tests due to KAFKA-12944 > --- > > Key: KAFKA-13116 > URL: https://issues.apache.org/jira/browse/KAFKA-13116 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Blocker > Fix For: 3.0.0 > > > Several system tests involving legacy message formats are failing due to > KAFKA-12944: > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2021-07-21--001.system-test-kafka-trunk--1626872410--confluentinc--master–038bdaa4df/report.html > All system tests that write data with legacy message formats need to use IBP > 2.8 or lower. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13116) Adjust system tests due to KAFKA-12944
Ismael Juma created KAFKA-13116: --- Summary: Adjust system tests due to KAFKA-12944 Key: KAFKA-13116 URL: https://issues.apache.org/jira/browse/KAFKA-13116 Project: Kafka Issue Type: Sub-task Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 3.0.0 Several system tests involving legacy message formats are failing due to KAFKA-12944: http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2021-07-21--001.system-test-kafka-trunk--1626872410--confluentinc--master–038bdaa4df/report.html All system tests that write data with legacy message formats need to use IBP 2.8 or lower. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12418) Make sure it's ok not to include test jars in the release tarball
[ https://issues.apache.org/jira/browse/KAFKA-12418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12418: Fix Version/s: (was: 3.0.0) > Make sure it's ok not to include test jars in the release tarball > - > > Key: KAFKA-12418 > URL: https://issues.apache.org/jira/browse/KAFKA-12418 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Blocker > > As of [https://github.com/apache/kafka/pull/10203,] the release tarball no > longer includes includes test, sources, javadoc and test sources jars. These > are still published to the Maven Central repository. > This seems like a good change and 3.0.0 would be a good time to do it, but > filing this JIRA to follow up and make sure before said release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12418) Make sure it's ok not to include test jars in the release tarball
[ https://issues.apache.org/jira/browse/KAFKA-12418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-12418. - Resolution: Done > Make sure it's ok not to include test jars in the release tarball > - > > Key: KAFKA-12418 > URL: https://issues.apache.org/jira/browse/KAFKA-12418 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Blocker > > As of [https://github.com/apache/kafka/pull/10203,] the release tarball no > longer includes includes test, sources, javadoc and test sources jars. These > are still published to the Maven Central repository. > This seems like a good change and 3.0.0 would be a good time to do it, but > filing this JIRA to follow up and make sure before said release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #11096: Adding reviewers.py to help tag reviewers in commit message
ijuma commented on pull request #11096: URL: https://github.com/apache/kafka/pull/11096#issuecomment-884179925 Thanks, this is helpful. Any chance we can add a map that deduplicates emails from committers? In this PR, we could add the map and a couple of cases and then we can iterate as needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11094: MINOR: Improve usage of LogManager.currentDefaultConfig
ijuma merged pull request #11094: URL: https://github.com/apache/kafka/pull/11094 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11094: MINOR: Improve usage of LogManager.currentDefaultConfig
ijuma commented on pull request #11094: URL: https://github.com/apache/kafka/pull/11094#issuecomment-884178111 Test failure is unrelated: kafka.api.ConsumerBounceTest.testCloseDuringRebalance() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11100: MINOR: update doc to reflect the grace period change
cadonna commented on pull request #11100: URL: https://github.com/apache/kafka/pull/11100#issuecomment-884117221 Cherry-picked to 3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #11100: MINOR: update doc to reflect the grace period change
cadonna merged pull request #11100: URL: https://github.com/apache/kafka/pull/11100 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13115) doSend can be blocking
Ivan Vaskevych created KAFKA-13115: -- Summary: doSend can be blocking Key: KAFKA-13115 URL: https://issues.apache.org/jira/browse/KAFKA-13115 Project: Kafka Issue Type: Bug Reporter: Ivan Vaskevych https://github.com/apache/kafka/pull/11023 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on pull request #11100: URL: https://github.com/apache/kafka/pull/11100#issuecomment-884060176 @cadonna , thanks for comments. PR updated. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size
dajac commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r673818846 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { -scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { -val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => -entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false -} -}.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs -}.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata.inLock { -txnMetadata.prepareDead() + private def collectExpiredTransactionalIds( +partitionId: Int, +partitionCacheEntry: TxnMetadataCacheEntry + ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = { +val currentTimeMs = time.milliseconds() + +inReadLock(stateLock) { + val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) + replicaManager.getLogConfig(transactionPartition) match { +case Some(logConfig) => + val maxBatchSize = logConfig.maxMessageSize + val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata] + + lazy val recordsBuilder = MemoryRecords.builder( +ByteBuffer.allocate(math.min(16384, maxBatchSize)), Review comment: For my own education, why do we use `16384` as a minimum here? ## File path: core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ## @@ -629,6 +631,62 @@ class TransactionStateManagerTest { verifyMetadataDoesExistAndIsUsable(transactionalId2) } + @Test + def testTransactionExpirationShouldRespectBatchSize(): Unit = { +val partitionIds = 0 until numPartitions +val maxBatchSize = 512 + +loadTransactionsForPartitions(partitionIds) + +val allTransactionalIds = mutable.Set.empty[String] +for (i <- 0 to 1000) { + val txnlId = s"id_$i" + val producerId = i + val txnMetadata = transactionMetadata(txnlId, producerId) + txnMetadata.txnLastUpdateTimestamp = time.milliseconds() - txnConfig.transactionalIdExpirationMs + transactionManager.putTransactionStateIfNotExists(txnMetadata) + allTransactionalIds += txnlId +} + +def removeExpiredTransactionalIds(): Map[TopicPartition, MemoryRecords] = { + EasyMock.reset(replicaManager) + expectLogConfig(partitionIds, maxBatchSize) + + val appendedRecordsCapture = expectTransactionalIdExpiration(Errors.NONE) + EasyMock.replay(replicaManager) + + transactionManager.removeExpiredTransactionalIds() + EasyMock.verify(replicaManager) + + assertTrue(appendedRecordsCapture.hasCaptured) + appendedRecordsCapture.getValue +} + +def hasUnexpiredTransactionalIds: Boolean = { + val unexpiredTransactions = transactionManager.listTransactionStates(Set.empty, Set.empty) +.transactionStates.asScala + assertTrue(unexpiredTransactions.forall(txn => txn.transactionState == Empty.name)) + unexpiredTransactions.nonEmpty +} + +var iterations = 0 +val expiredTransactionalIds = mutable.Set.empty[String] +while (hasUnexpiredTransactionalIds) { + removeExpiredTransactionalIds().forKeyValue { (_, records) => +assertTrue(records.sizeInBytes() < maxBatchSize) Review comment: nit: We might be able to drop the `()` of `sizeInBytes` and `records` below. ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { -scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { -val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => -entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false -} -}.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - confi
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673833419 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3230,13 +3229,13 @@ KTable-KTable Foreign-Key import org.apache.kafka.streams.kstream.SlidingWindows; // A sliding time window with a time difference of 10 minutes and grace period of 30 minutes -Duration timeDifferenceMs = Duration.ofMinutes(10); -Duration gracePeriodMs = Duration.ofMinutes(30); -SlidingWindows.withTimeDifferenceAndGrace(timeDifferenceMs,gracePeriodMs); +Duration timeDifference = Duration.ofMinutes(10); +Duration gracePeriod = Duration.ofMinutes(30); +SlidingWindows.ofTimeDifferenceAndGrace(timeDifference, gracePeriod); Note Sliding windows require that you set a grace period, as shown above. For time windows and session windows, - setting the grace period is optional and defaults to 24 hours. + setting the grace period is optional. Review comment: You're right. Remove the note. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673833045 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3208,14 +3208,13 @@ KTable-KTable Foreign-Key import org.apache.kafka.streams.kstream.TimeWindows; // A tumbling time window with a size of 5 minutes (and, by definition, an implicit -// advance interval of 5 minutes). Note the explicit grace period, as the current -// default value is 24 hours, which may be larger than needed for smaller windows. -Duration windowSizeMs = Duration.ofMinutes(5); -Duration gracePeriodMs = Duration.ofMinutes(1); -TimeWindows.of(windowSizeMs).grace(gracePeriodMs); +// advance interval of 5 minutes). +Duration windowSize = Duration.ofMinutes(5); +Duration gracePeriod = Duration.ofMinutes(1); Review comment: Make sense! Added 1 minute grace period comment: `A tumbling time window with a size of 5 minutes (and, by definition, an implicit advance interval of 5 minutes), and grace period of 1 minute.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna closed pull request #10944: MINOR: Loose verification of startup in EOS system tests
cadonna closed pull request #10944: URL: https://github.com/apache/kafka/pull/10944 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10944: MINOR: Loose verification of startup in EOS system tests
cadonna commented on pull request #10944: URL: https://github.com/apache/kafka/pull/10944#issuecomment-884015976 PR #11090 got merged. I will close this for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
cadonna commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673778552 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3208,14 +3208,13 @@ KTable-KTable Foreign-Key import org.apache.kafka.streams.kstream.TimeWindows; // A tumbling time window with a size of 5 minutes (and, by definition, an implicit -// advance interval of 5 minutes). Note the explicit grace period, as the current -// default value is 24 hours, which may be larger than needed for smaller windows. -Duration windowSizeMs = Duration.ofMinutes(5); -Duration gracePeriodMs = Duration.ofMinutes(1); -TimeWindows.of(windowSizeMs).grace(gracePeriodMs); +// advance interval of 5 minutes). +Duration windowSize = Duration.ofMinutes(5); +Duration gracePeriod = Duration.ofMinutes(1); Review comment: Here I would either mention the grace period in the comment above or remove the grace period completely. Otherwise, it comes a bit as a surprise that a grace period is specified. ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3230,13 +3229,13 @@ KTable-KTable Foreign-Key import org.apache.kafka.streams.kstream.SlidingWindows; // A sliding time window with a time difference of 10 minutes and grace period of 30 minutes -Duration timeDifferenceMs = Duration.ofMinutes(10); -Duration gracePeriodMs = Duration.ofMinutes(30); -SlidingWindows.withTimeDifferenceAndGrace(timeDifferenceMs,gracePeriodMs); +Duration timeDifference = Duration.ofMinutes(10); +Duration gracePeriod = Duration.ofMinutes(30); +SlidingWindows.ofTimeDifferenceAndGrace(timeDifference, gracePeriod); Note Sliding windows require that you set a grace period, as shown above. For time windows and session windows, - setting the grace period is optional and defaults to 24 hours. + setting the grace period is optional. Review comment: That is not true anymore. More precisely, it is only true for deprecated methods, that we do not want to mention in the docs, right? Maybe you can even remove the note completely, since it explains a difference between sliding windows and other windows that does not exist anymore. ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3322,8 +3321,8 @@ KTable-KTable Foreign-Key The key parts of this program are: - grace(ofMinutes(10)) - This allows us to bound the lateness of events the window will accept. + ofSizeAndGrace(Duration.ofHours(1), ofMinutes(10)) + The ofMinutes(10) parameter allows us to bound the lateness of events the window will accept. Review comment: ```suggestion The specified grace period of 10 minutes (i.e., the ofMinutes(10) argument) allows us to bound the lateness of events the window will accept. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673759493 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -204,7 +204,7 @@ public long size() { * @param afterWindowEnd The grace period to admit out-of-order events to a window. * @return this updated builder * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} - * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead + * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead */ @Deprecated public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { Review comment: We should use `ofTimeDifferenceAndGrace` for original `grace` method. Fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673735822 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -138,7 +138,7 @@ public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDiffere * @param timeDifference * @return a new JoinWindows object with the window definition with and grace period (default to 24 hours minus {@code timeDifference}) * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} - * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead + * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)}} instead */ @Deprecated public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException { Review comment: We should use `ofTimeDifferenceWithNoGrace` for original `of` method. Fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on pull request #11100: URL: https://github.com/apache/kafka/pull/11100#issuecomment-883965278 @ableegoldman @mjsax @cadonna , please take a look. I think this PR should also merge into v3.0. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673733752 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ## @@ -138,7 +138,7 @@ public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDiffere * @param timeDifference * @return a new JoinWindows object with the window definition with and grace period (default to 24 hours minus {@code timeDifference}) * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} - * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead + * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)}} instead Review comment: We should use `ofTimeDifferenceWithNoGrace` for original `of` method. Fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673732882 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3208,14 +3208,13 @@ KTable-KTable Foreign-Key import org.apache.kafka.streams.kstream.TimeWindows; // A tumbling time window with a size of 5 minutes (and, by definition, an implicit -// advance interval of 5 minutes). Note the explicit grace period, as the current -// default value is 24 hours, which may be larger than needed for smaller windows. Review comment: remove this note: `Note the explicit grace period, as the current default value is 24 hours, which may be larger than needed for smaller windows.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673731818 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3164,9 +3164,9 @@ KTable-KTable Foreign-Key // A hopping time window with a size of 5 minutes and an advance interval of 1 minute. // The window's name -- the string parameter -- is used to e.g. name the backing state store. -Duration windowSizeMs = Duration.ofMinutes(5); -Duration advanceMs = Duration.ofMinutes(1); -TimeWindows.of(windowSizeMs).advanceBy(advanceMs); +Duration windowSize = Duration.ofMinutes(5); +Duration advance = Duration.ofMinutes(1); +TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance); Review comment: Update the variable name here since these are not `Long` type to represent ms value. They are Duration type. ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3164,9 +3164,9 @@ KTable-KTable Foreign-Key // A hopping time window with a size of 5 minutes and an advance interval of 1 minute. // The window's name -- the string parameter -- is used to e.g. name the backing state store. -Duration windowSizeMs = Duration.ofMinutes(5); -Duration advanceMs = Duration.ofMinutes(1); -TimeWindows.of(windowSizeMs).advanceBy(advanceMs); +Duration windowSize = Duration.ofMinutes(5); +Duration advance = Duration.ofMinutes(1); +TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance); Review comment: Update the variable name here since these are not `Long` type to represent ms value (as before). They are Duration type. ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3164,9 +3164,9 @@ KTable-KTable Foreign-Key // A hopping time window with a size of 5 minutes and an advance interval of 1 minute. // The window's name -- the string parameter -- is used to e.g. name the backing state store. -Duration windowSizeMs = Duration.ofMinutes(5); -Duration advanceMs = Duration.ofMinutes(1); -TimeWindows.of(windowSizeMs).advanceBy(advanceMs); +Duration windowSize = Duration.ofMinutes(5); +Duration advance = Duration.ofMinutes(1); +TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance); Review comment: Update the variable name here since these are not `Long` type to represent ms value (as before). They are Duration type now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673731818 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3164,9 +3164,9 @@ KTable-KTable Foreign-Key // A hopping time window with a size of 5 minutes and an advance interval of 1 minute. // The window's name -- the string parameter -- is used to e.g. name the backing state store. -Duration windowSizeMs = Duration.ofMinutes(5); -Duration advanceMs = Duration.ofMinutes(1); -TimeWindows.of(windowSizeMs).advanceBy(advanceMs); +Duration windowSize = Duration.ofMinutes(5); +Duration advance = Duration.ofMinutes(1); +TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance); Review comment: These should be Duration type variable, not `Long` type to represent ms value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change
showuon commented on a change in pull request #11100: URL: https://github.com/apache/kafka/pull/11100#discussion_r673730072 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -650,7 +650,7 @@
[GitHub] [kafka] showuon opened a new pull request #11100: MINOR: update doc to reflect the grace period change
showuon opened a new pull request #11100: URL: https://github.com/apache/kafka/pull/11100 Found the issue while reading stream doc. We removed default 24 hours grace period in KIP-633, and deprecate some grace methods, but we forgot to update the stream docs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org