[GitHub] [kafka] JoeCqupt commented on pull request #11089: MINOR: remove unnecessary judgment in AdminUtils::assignReplicasToBrokersRackAware

2021-07-21 Thread GitBox


JoeCqupt commented on pull request #11089:
URL: https://github.com/apache/kafka/pull/11089#issuecomment-884671155


   call for review @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest

2021-07-21 Thread GitBox


ableegoldman edited a comment on pull request #11103:
URL: https://github.com/apache/kafka/pull/11103#issuecomment-884541103


   Kicked off system test run with 50 repeats -- 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4613/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11102: MINOR: Rename the @metadata topic to __cluster_metadata

2021-07-21 Thread GitBox


cmccabe merged pull request #11102:
URL: https://github.com/apache/kafka/pull/11102


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest

2021-07-21 Thread GitBox


ableegoldman edited a comment on pull request #11103:
URL: https://github.com/apache/kafka/pull/11103#issuecomment-884541103


   Kicked off system test run with 15 repeats -- 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4612/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

2021-07-21 Thread GitBox


d8tltanc commented on pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#issuecomment-884570483


   
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once
   
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta
   
   These two tests seem to be related and fail in both runs. Will manually run 
them locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

2021-07-21 Thread GitBox


d8tltanc commented on pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#issuecomment-884568804


   Test 4604 Failures:
   
   test_id:
kafkatest.tests.core.zookeeper_authorizer_test.ZooKeeperAuthorizerTest.test_authorizer.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   1 minute 8.011 seconds
   --
   --
   test_id:
kafkatest.tests.core.fetch_from_follower_test.FetchFromFollowerTest.test_consumer_preferred_read_replica.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   2 minutes 8.750 seconds
   --
   --
   test_id:
kafkatest.tests.core.upgrade_test.TestUpgrade.test_upgrade.from_kafka_version=0.9.0.1.to_message_format_version=None.compression_types=.lz4
   status: FAIL
   run time:   4 minutes 7.158 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery.processing_guarantee=exactly_once_beta
   status: FAIL
   run time:   9 minutes 56.437 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once
   status: FAIL
   run time:   10 minutes 8.148 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta
   status: FAIL
   run time:   10 minutes 8.626 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_rolling_bounces_will_not_trigger_rebalance_under_static_membership
   status: FAIL
   run time:   2 minutes 15.944 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=False.enable_autocommit=False.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   1 minute 47.734 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=True.enable_autocommit=False.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   1 minute 48.715 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_rolling_bounce.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   4 minutes 27.675 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_persisted_after_rejoin.bounce_mode=all.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   2 minutes 0.632 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_persisted_after_rejoin.bounce_mode=rolling.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   4 minutes 36.821 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.broker_type=leader.security_protocol=PLAINTEXT.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   4 minutes 59.358 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.2.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   3 minutes 10.650 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.broker_type=leader.security_protocol=SASL_SSL.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   5 minutes 46.542 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.3.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   3 minutes 11.345 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_type=leader.security_protocol=PLAINTEXT.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   6 minutes 13.538 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_type=leader.security_protocol=SASL_SSL.client_sasl_mechanism=PLAIN.interbroker_sasl_mechanism=GSSAPI.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   5 minutes 28.945 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_type=leader.security_protocol=SASL_SSL.client_sasl_mechanism=PLAIN.interbroker_sasl_mechanism=PLAIN.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   5 minutes 26.515 seconds
   --
   --
   

[GitHub] [kafka] d8tltanc commented on pull request #11002: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings

2021-07-21 Thread GitBox


d8tltanc commented on pull request #11002:
URL: https://github.com/apache/kafka/pull/11002#issuecomment-884566723


   System test run 4607 failures:
   
   test_id:
kafkatest.tests.core.zookeeper_authorizer_test.ZooKeeperAuthorizerTest.test_authorizer.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   58.147 seconds
   --
   --
   test_id:
kafkatest.tests.core.fetch_from_follower_test.FetchFromFollowerTest.test_consumer_preferred_read_replica.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   2 minutes 8.162 seconds
   --
   --
   test_id:
kafkatest.tests.core.transactions_test.TransactionsTest.test_transactions.failure_mode=hard_bounce.bounce_target=clients.check_order=True.use_group_metadata=True
   status: FAIL
   run time:   2 minutes 16.481 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once
   status: FAIL
   run time:   10 minutes 12.060 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once_beta
   status: FAIL
   run time:   10 minutes 6.530 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_smoke_test.StreamsSmokeTest.test_streams.processing_guarantee=at_least_once.crash=False.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   4 minutes 7.642 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_metadata_upgrade.from_version=0.10.2.2.to_version=3.1.0-SNAPSHOT
   status: FAIL
   run time:   8 minutes 11.668 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_rolling_bounces_will_not_trigger_rebalance_under_static_membership
   status: FAIL
   run time:   2 minutes 15.825 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=False.enable_autocommit=True.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   1 minute 20.948 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   1 minute 15.040 seconds
   --
   --
   test_id:
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_version_probing_upgrade
   status: FAIL
   run time:   2 minutes 11.466 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_rolling_bounce.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   4 minutes 25.921 seconds
   --
   --
   test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_persisted_after_rejoin.bounce_mode=rolling.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   3 minutes 34.687 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.broker_type=leader.security_protocol=PLAINTEXT.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   4 minutes 21.836 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.broker_type=leader.security_protocol=SASL_SSL.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   4 minutes 43.409 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.3.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   2 minutes 44.057 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_bounce.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.2.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   5 minutes 47.045 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=clean_shutdown.security_protocol=PLAINTEXT.broker_type=leader.compression_type=gzip.tls_version=TLSv1.2.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   2 minutes 0.945 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_type=leader.security_protocol=PLAINTEXT.metadata_quorum=REMOTE_KRAFT
   status: FAIL
   run time:   6 minutes 43.349 seconds
   --
   --
   test_id:
kafkatest.tests.core.replication_test.ReplicationTest.test_replication_with_broker_failure.failure_mode=hard_bounce.broker_

[GitHub] [kafka] ableegoldman commented on pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest

2021-07-21 Thread GitBox


ableegoldman commented on pull request #11103:
URL: https://github.com/apache/kafka/pull/11103#issuecomment-884541103


   Kicked off system test run with 30 repeats -- 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4611/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-07-21 Thread GitBox


jolshan commented on a change in pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#discussion_r674387908



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -332,6 +329,9 @@ public FetchRequestData build() {
 iter.remove();
 // Indicate that we no longer want to listen to this 
partition.
 removed.add(topicPartition);
+// If we do not have this topic ID in the session, we can 
not use topic IDs

Review comment:
   One alternative to this approach is to only do this check when no 
partitions were added to the builder, but I'm not sure this added complexity is 
worth it. The method here is also safer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-07-21 Thread GitBox


jolshan opened a new pull request #11104:
URL: https://github.com/apache/kafka/pull/11104


   The FetchSessionHandler had a small bug in the session build method where we 
did not consider building a session where no partitions were added and the 
session previously did not use topic IDs. (ie, it was relying on at least one 
partition being added to signify whether topic IDs were present)
   
   Due to this, we could send forgotten partitions with the zero UUID. This 
would always result in an exception and closed session.
   
   This PR fixes the logic to check that any forgotten partitions have topic 
IDs. There is also a test added for the empty session situation when topic IDs 
are used and when topic names are used.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11103: HOTFIX: Set session interval back to 10s for StreamsCooperativeRebalanceUpgradeTest

2021-07-21 Thread GitBox


ableegoldman opened a new pull request #11103:
URL: https://github.com/apache/kafka/pull/11103


   This test is hitting pretty frequent timeouts after bouncing a node and 
waiting for it to come back and fully rejoin the group. It seems to now take 
45s for the initial JoinGroup to succeed, which I suspect is due to the new 
default session.interval.ms (which was recently changed to 45s). Let's try 
fixing this config to the old value of 10s and see if that helps it rejoin in 
time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11101: MINOR: Remove redundant fields in dump log record output

2021-07-21 Thread GitBox


ijuma commented on pull request #11101:
URL: https://github.com/apache/kafka/pull/11101#issuecomment-884537914


   Can we add a test to DumpLogSegmentsTest? I fear we will regress again in 
the future, otherwise.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel commented on pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots

2021-07-21 Thread GitBox


niket-goel commented on pull request #11053:
URL: https://github.com/apache/kafka/pull/11053#issuecomment-884534694


   Rebased the branch with Trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel opened a new pull request #11102: Renaming the internal metadata topic to "__cluster_metadata"

2021-07-21 Thread GitBox


niket-goel opened a new pull request #11102:
URL: https://github.com/apache/kafka/pull/11102


   This PR renames the internal metadata topic to "__cluster_metadata". The 
topic was previously known as "@metadata"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11101: MINOR: Remove redundant fields in dump log record output

2021-07-21 Thread GitBox


mumrah commented on a change in pull request #11101:
URL: https://github.com/apache/kafka/pull/11101#discussion_r674342185



##
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##
@@ -268,13 +268,10 @@ object DumpLogSegments {
 }
 lastOffset = record.offset
 
-var prefix = s"${RecordIndent} "
+var prefix = s"$RecordIndent "
 if (!skipRecordMetadata) {
-  print(s"${prefix}offset: ${record.offset}" +
-  s" keySize: ${record.keySize} valueSize: ${record.valueSize} 
${batch.timestampType}: ${record.timestamp}" +
-  s" baseOffset: ${batch.baseOffset} lastOffset: 
${batch.lastOffset} baseSequence: ${batch.baseSequence}" +
-  s" lastSequence: ${batch.lastSequence} producerEpoch: 
${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" +
-  s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic} 
compressType: ${batch.compressionType} position: ${validBytes}")
+  print(s"${prefix}offset: ${record.offset} 
${batch.timestampType}: ${record.timestamp} " +
+s"keysize: ${record.keySize} valuesize: ${record.valueSize}")

Review comment:
   nit: keySize and valueSize?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-21 Thread GitBox


ableegoldman commented on pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#issuecomment-884482756


   Merged to trunk and cherrypicked to 2.8 and 3.0 (cc @kkonstantine)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13121) Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()

2021-07-21 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385093#comment-17385093
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13121:


Hey [~jolshan], thought you might have some context on this since it (sort of) 
had to do with topic ids.

The full logs aren't much but I'll upload them in case that helps: 
[^TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates.rtf]

 

> Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
> ---
>
> Key: KAFKA-13121
> URL: https://issues.apache.org/jira/browse/KAFKA-13121
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Attachments: 
> TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates.rtf
>
>
> h4. Stack Trace
> {code:java}
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No resource found for partition: 
> TopicIdPartition{topicId=2B9rDu44TE6c8pLG8A0RAg, topicPartition=new-leader-0}
> at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:112)
>  
> at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.listRemoteLogSegments(RemotePartitionMetadataStore.java:98)
> at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.listRemoteLogSegments(TopicBasedRemoteLogMetadataManager.java:212)
> at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates(TopicBasedRemoteLogMetadataManagerTest.java:99){code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10921/11/testReport/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13121) Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()

2021-07-21 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13121:
---
Attachment: 
TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates.rtf

> Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
> ---
>
> Key: KAFKA-13121
> URL: https://issues.apache.org/jira/browse/KAFKA-13121
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Attachments: 
> TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates.rtf
>
>
> h4. Stack Trace
> {code:java}
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No resource found for partition: 
> TopicIdPartition{topicId=2B9rDu44TE6c8pLG8A0RAg, topicPartition=new-leader-0}
> at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:112)
>  
> at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.listRemoteLogSegments(RemotePartitionMetadataStore.java:98)
> at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.listRemoteLogSegments(TopicBasedRemoteLogMetadataManager.java:212)
> at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates(TopicBasedRemoteLogMetadataManagerTest.java:99){code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10921/11/testReport/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13121) Flaky Test TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()

2021-07-21 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13121:
--

 Summary: Flaky Test 
TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
 Key: KAFKA-13121
 URL: https://issues.apache.org/jira/browse/KAFKA-13121
 Project: Kafka
  Issue Type: Bug
  Components: log
Reporter: A. Sophie Blee-Goldman


h4. Stack Trace
{code:java}
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No 
resource found for partition: TopicIdPartition{topicId=2B9rDu44TE6c8pLG8A0RAg, 
topicPartition=new-leader-0}
at 
org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:112)
 
at 
org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.listRemoteLogSegments(RemotePartitionMetadataStore.java:98)
at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.listRemoteLogSegments(TopicBasedRemoteLogMetadataManager.java:212)
at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates(TopicBasedRemoteLogMetadataManagerTest.java:99){code}
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10921/11/testReport/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust

2021-07-21 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas updated KAFKA-13120:

Description: When fixing the `streams_static_membership_test.py` we noticed 
that the test is pretty bare bones, it creates a streams application but 
doesn't do much with the streams application, eg has no stateful processing. We 
should flesh this out a bit to be more realistic and potentially consider 
testing with EOS as well. The full java test is in `StaticMembershipTestClient` 
 (was: When fixing the `streams_static_membership_test.py` we noticed that the 
test is pretty bare bones, it creates a streams application but doesn't ever 
send data through it or do much with the streams application. We should flesh 
this out a bit to be more realistic. The full java test is in 
`StaticMembershipTestClient`)

> Flesh out `streams_static_membership_test` to be more robust
> 
>
> Key: KAFKA-13120
> URL: https://issues.apache.org/jira/browse/KAFKA-13120
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Leah Thomas
>Priority: Minor
>
> When fixing the `streams_static_membership_test.py` we noticed that the test 
> is pretty bare bones, it creates a streams application but doesn't do much 
> with the streams application, eg has no stateful processing. We should flesh 
> this out a bit to be more realistic and potentially consider testing with EOS 
> as well. The full java test is in `StaticMembershipTestClient`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

2021-07-21 Thread GitBox


ableegoldman merged pull request #10921:
URL: https://github.com/apache/kafka/pull/10921


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for StreamsMe…

2021-07-21 Thread GitBox


ableegoldman commented on a change in pull request #10881:
URL: https://github.com/apache/kafka/pull/10881#discussion_r674307316



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
##
@@ -23,43 +23,32 @@
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import 
org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.easymock.EasyMock;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Properties;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({InternalTopologyBuilder.class})
+import static org.mockito.Mockito.mock;
+
 public class TableSourceNodeTest {
 
 private static final String STORE_NAME = "store-name";
 private static final String TOPIC = "input-topic";
 
-private final InternalTopologyBuilder topologyBuilder = 
PowerMock.createNiceMock(InternalTopologyBuilder.class);
+private final InternalTopologyBuilder topologyBuilder = 
mock(InternalTopologyBuilder.class);
 
 @Test
 public void 
shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() {
 final boolean shouldReuseSourceTopicForChangelog = true;
 topologyBuilder.connectSourceStoreAndTopic(STORE_NAME, TOPIC);
-EasyMock.replay(topologyBuilder);
 
 buildTableSourceNode(shouldReuseSourceTopicForChangelog);
-
-EasyMock.verify(topologyBuilder);

Review comment:
   Admittedly I don't know much about Mockito, but it looks like we're 
removing the one verification step that's actually testing something in this 
test without replacing it. Isn't there a `verify()` for Mockito?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #11093: MINOR: add serde configs to properly set serdes in failing StreamsStaticMembershipTest

2021-07-21 Thread GitBox


ableegoldman edited a comment on pull request #11093:
URL: https://github.com/apache/kafka/pull/11093#issuecomment-884457416


   Merged to trunk and cherrypicked to 3.0 (cc @kkonstantine -- it's a system 
test fix)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for StreamsMe…

2021-07-21 Thread GitBox


ableegoldman commented on pull request #10881:
URL: https://github.com/apache/kafka/pull/10881#issuecomment-884466044


   cc also @cadonna who's familiar with these specific tests I think


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11093: MINOR: add serde configs to properly set serdes in failing StreamsStaticMembershipTest

2021-07-21 Thread GitBox


ableegoldman commented on pull request #11093:
URL: https://github.com/apache/kafka/pull/11093#issuecomment-884457416


   Merged to trunk and cherrypicked to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11093: MINOR: add serde configs to properly set serdes in failing StreamsStaticMembershipTest

2021-07-21 Thread GitBox


ableegoldman merged pull request #11093:
URL: https://github.com/apache/kafka/pull/11093


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11093: FIX: add serde configs to properly set serde

2021-07-21 Thread GitBox


ableegoldman commented on pull request #11093:
URL: https://github.com/apache/kafka/pull/11093#issuecomment-884455762


   Just some unrelated/known flaky test failures: 
`ConsumerBounceTest.testCloseDuringRebalance()` and 
`TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation`  
(the latter was recently fixed and the error message in the failure does not 
match the new code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

2021-07-21 Thread GitBox


jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674249547



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##
@@ -98,7 +98,7 @@ public String newStoreName(final String prefix) {
 null,
 GLOBAL_STORE_TOPIC_NAME,
 "processorName",
-() -> ProcessorAdapter.adapt(new KTableSource<>(GLOBAL_STORE_NAME, 
GLOBAL_STORE_NAME).get()));
+() -> new KTableSource<>(GLOBAL_STORE_NAME, 
GLOBAL_STORE_NAME).get());

Review comment:
   Oh! great catch! You're right, I've rolled back the change and test is 
working. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

2021-07-21 Thread GitBox


jeqo commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674249144



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##
@@ -92,33 +97,36 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final V value) {
+public void process(final Record record) {
 // if the key is null, then ignore the record
-if (key == null) {
-LOG.warn(
+if (record.key() == null) {
+context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(

Review comment:
   Agreed. I added some more context to the messages and make them 
consistent across the class. Let me know it looks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] iakunin edited a comment on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code

2021-07-21 Thread GitBox


iakunin edited a comment on pull request #10775:
URL: https://github.com/apache/kafka/pull/10775#issuecomment-884402579


   @jsancio got it. Thanks for explaining me that. I'll be waiting patiently :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] iakunin commented on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code

2021-07-21 Thread GitBox


iakunin commented on pull request #10775:
URL: https://github.com/apache/kafka/pull/10775#issuecomment-884402579


   @jsancio got it. Thanks for explaining me that. I will wait patiently :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #11101: MINOR: Remove redundant fields in dump log record output

2021-07-21 Thread GitBox


hachikuji opened a new pull request #11101:
URL: https://github.com/apache/kafka/pull/11101


   In 2.8, the dump log output regressed to print batch level information for 
each record, which makes the output considerably noisier. This patch changes 
the output to what it was in 2.7 and previous versions. We only look batch 
metadata at the batch level.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust

2021-07-21 Thread Leah Thomas (Jira)
Leah Thomas created KAFKA-13120:
---

 Summary: Flesh out `streams_static_membership_test` to be more 
robust
 Key: KAFKA-13120
 URL: https://issues.apache.org/jira/browse/KAFKA-13120
 Project: Kafka
  Issue Type: Task
  Components: streams, system tests
Reporter: Leah Thomas


When fixing the `streams_static_membership_test.py` we noticed that the test is 
pretty bare bones, it creates a streams application but doesn't ever send data 
through it or do much with the streams application. We should flesh this out a 
bit to be more realistic. The full java test is in `StaticMembershipTestClient`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots

2021-07-21 Thread GitBox


cmccabe commented on a change in pull request #11053:
URL: https://github.com/apache/kafka/pull/11053#discussion_r674207120



##
File path: tests/kafkatest/tests/core/snapshot_test.py
##
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+METADATA_LOG_RETENTION_BYTES = "4096"
+METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+METADATA_LOG_SEGMENT_BYTES = "900"
+METADATA_LOG_SEGMENT_MS = "1"
+
+def __init__(self, test_context):
+super(TestSnapshots, self).__init__(test_context=test_context)
+self.topics_created = 0
+self.topic = "test_topic"
+self.partitions = 3
+self.replication_factor = 3
+self.num_nodes = 3
+
+# Producer and consumer
+self.producer_throughput = 1000
+self.num_producers = 1
+self.num_consumers = 1
+
+security_protocol = 'PLAINTEXT'
+# Setup Custom Config to ensure snapshot will be generated 
deterministically
+self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+  topics={self.topic: {"partitions": 
self.partitions,
+   "replication-factor": 
self.replication_factor,
+   'configs': 
{"min.insync.replicas": 2}}},
+  server_prop_overrides=[
+  [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+  
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+  
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+  
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+  
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+  ])
+
+self.kafka.interbroker_security_protocol = security_protocol
+self.kafka.security_protocol = security_protocol
+
+def setUp(self):
+# Start the cluster and ensure that a snapshot is generated
+self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+"Snapshot test should be run Kraft Modes only"
+
+self.kafka.start()
+
+topic_count = 10
+self.topics_created += self.create_n_topics(topic_count)
+
+if self.kafka.remote_controller_quorum:
+controller_nodes = self.kafka.remote_controller_quorum.nodes
+else:
+controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+# Waiting for snapshot creation and first log segment
+# cleanup on all controller nodes
+for node in controller_nodes:
+self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+self.wait_for_log_segment_delete(node)
+self.wait_for_snapshot(node)
+self.logger.debug("Verified Snapshots exist on controller nodes")
+
+def create_n_topics(self, topic_count):
+for i in range(self.topics_created, topic_count):
+topic = "test_topic_%d" % i
+print("Creating topic %s" % topic, flush=True)
+topic_cfg = {
+"topic": topic,
+"partitions": self.partitions,
+

[GitHub] [kafka] cmccabe commented on a change in pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots

2021-07-21 Thread GitBox


cmccabe commented on a change in pull request #11053:
URL: https://github.com/apache/kafka/pull/11053#discussion_r674205413



##
File path: tests/kafkatest/tests/core/snapshot_test.py
##
@@ -0,0 +1,224 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+import time
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+METADATA_LOG_RETENTION_BYTES = "2048"
+METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "2048"
+METADATA_LOG_SEGMENT_BYTES = "900"
+METADATA_LOG_SEGMENT_MS = "1"
+TOPIC_NAME_PREFIX = "test_topic_"
+
+def __init__(self, test_context):
+super(TestSnapshots, self).__init__(test_context=test_context)
+self.topics_created = 0
+self.topic = "test_topic"
+self.partitions = 3
+self.replication_factor = 3
+self.num_nodes = 3
+
+# Producer and consumer
+self.producer_throughput = 1000
+self.num_producers = 1
+self.num_consumers = 1
+
+security_protocol = 'PLAINTEXT'
+# Setup Custom Config to ensure snapshot will be generated 
deterministically
+self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+  topics={self.topic: {"partitions": 
self.partitions,
+   "replication-factor": 
self.replication_factor,
+   'configs': 
{"min.insync.replicas": 2}}},
+  server_prop_overrides=[
+  [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+  
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+  
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+  
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+  
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+  ])
+
+self.kafka.interbroker_security_protocol = security_protocol
+self.kafka.security_protocol = security_protocol
+
+def setUp(self):
+# Start the cluster and ensure that a snapshot is generated
+self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+"Snapshot test should be run Kraft Modes only"
+
+self.kafka.start()
+
+topic_count = 10
+self.topics_created += self.create_n_topics(topic_count)
+
+if self.kafka.remote_controller_quorum:
+self.controller_nodes = self.kafka.remote_controller_quorum.nodes
+else:
+self.controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+# Waiting for snapshot creation and first log segment
+# cleanup on all controller nodes
+for node in self.controller_nodes:
+self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+self.wait_for_log_segment_delete(node)
+self.wait_for_snapshot(node)
+self.logger.debug("Verified Snapshots exist on controller nodes")
+
+def create_n_topics(self, topic_count):
+for i in range(self.topics_created, topic_count):
+topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, i)
+self.logger.debug("Creating topic %s" % topic)
+topic_cfg = {
+

[GitHub] [kafka] cmccabe commented on a change in pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots

2021-07-21 Thread GitBox


cmccabe commented on a change in pull request #11053:
URL: https://github.com/apache/kafka/pull/11053#discussion_r674203880



##
File path: tests/kafkatest/tests/core/snapshot_test.py
##
@@ -0,0 +1,224 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+import time
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+METADATA_LOG_RETENTION_BYTES = "2048"

Review comment:
   I think you can just put the constants later on in the 
`server_prop_overrides` block. There's no reason to have a special name for 
them since they're only used in one place (which is itself a constant)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11053: KAFKA-13015 Ducktape System Tests for Metadata Snapshots

2021-07-21 Thread GitBox


cmccabe commented on a change in pull request #11053:
URL: https://github.com/apache/kafka/pull/11053#discussion_r674202902



##
File path: tests/kafkatest/services/kafka/config.py
##
@@ -24,6 +24,11 @@ class KafkaConfig(dict):
 DEFAULTS = {
 config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
 config_property.LOG_DIRS: 
"/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2",
+config_property.METADATA_LOG_DIR: "/mnt/kafka/kafka-metadata-logs",
+config_property.METADATA_LOG_SEGMENT_BYTES: 8388608,
+config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: 100,

Review comment:
   Hmm, I'm a bit confused about why we'd make something like "100 bytes 
between snapshots" the default. I guess the intention is to do a stress test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13119) Validate the KRaft controllerListener config on startup

2021-07-21 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13119:


 Summary: Validate the KRaft controllerListener config on startup
 Key: KAFKA-13119
 URL: https://issues.apache.org/jira/browse/KAFKA-13119
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Niket Goel
 Fix For: 3.0.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13119) Validate the KRaft controllerListener config on startup

2021-07-21 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-13119.
--
Resolution: Fixed

> Validate the KRaft controllerListener config on startup
> ---
>
> Key: KAFKA-13119
> URL: https://issues.apache.org/jira/browse/KAFKA-13119
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #11070: Validate the controllerListener config on startup

2021-07-21 Thread GitBox


cmccabe merged pull request #11070:
URL: https://github.com/apache/kafka/pull/11070


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe closed pull request #10695: KAFKA-12783: Remove the deprecated ZK-based partition reassignment API

2021-07-21 Thread GitBox


cmccabe closed pull request #10695:
URL: https://github.com/apache/kafka/pull/10695


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10695: KAFKA-12783: Remove the deprecated ZK-based partition reassignment API

2021-07-21 Thread GitBox


cmccabe commented on pull request #10695:
URL: https://github.com/apache/kafka/pull/10695#issuecomment-884364747


   I guess we're going with the approach from #10471 for 3.0, so I'll close 
this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13118) Backport KAFKA-9887 to 3.0 branch after 3.0.0 release

2021-07-21 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-13118:
--
Description: 
We need to backport the fix (commit hash `0314801a8e`) for KAFKA-9887 to the 
`3.0` branch. That fix was merged to `trunk`, `2.8`, and `2.7` _after_ the 3.0 
code freeze, and that issue is not a blocker or regression.

Be sure to update the "fix version" on KAFKA-9887 when the backport is complete.

  was:We need to backport the fix (commit hash `0314801a8e`) for KAFKA-9887 to 
the `3.0` branch. That fix was merged to `trunk`, `2.8`, and `2.7` _after_ the 
3.0 code freeze, and that issue is not a blocker or regression.


> Backport KAFKA-9887 to 3.0 branch after 3.0.0 release
> -
>
> Key: KAFKA-13118
> URL: https://issues.apache.org/jira/browse/KAFKA-13118
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 3.0.1
>
>
> We need to backport the fix (commit hash `0314801a8e`) for KAFKA-9887 to the 
> `3.0` branch. That fix was merged to `trunk`, `2.8`, and `2.7` _after_ the 
> 3.0 code freeze, and that issue is not a blocker or regression.
> Be sure to update the "fix version" on KAFKA-9887 when the backport is 
> complete.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13118) Backport KAFKA-9887 to 3.0 branch after 3.0.0 release

2021-07-21 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-13118:
-

 Summary: Backport KAFKA-9887 to 3.0 branch after 3.0.0 release
 Key: KAFKA-13118
 URL: https://issues.apache.org/jira/browse/KAFKA-13118
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Affects Versions: 3.0.1
Reporter: Randall Hauch
Assignee: Randall Hauch
 Fix For: 3.0.1


We need to backport the fix (commit hash `0314801a8e`) for KAFKA-9887 to the 
`3.0` branch. That fix was merged to `trunk`, `2.8`, and `2.7` _after_ the 3.0 
code freeze, and that issue is not a blocker or regression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9887) failed-task-count JMX metric not updated if task fails during startup

2021-07-21 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385005#comment-17385005
 ] 

Randall Hauch commented on KAFKA-9887:
--

Unfortunately, the 3.0 code freeze was two weeks ago, so I will create a new 
issue as a blocker for 3.0.1 to backport this fix to the `3.0` branch after the 
3.0.0 release is complete, and will like to this issue.

> failed-task-count JMX metric not updated if task fails during startup
> -
>
> Key: KAFKA-9887
> URL: https://issues.apache.org/jira/browse/KAFKA-9887
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Assignee: Michael Carter
>Priority: Major
> Fix For: 3.1.0, 2.7.2, 2.8.1
>
>
> If a task fails on startup (specifically, during [this code 
> section|https://github.com/apache/kafka/blob/00a59b392d92b0d6d3a321ef9a53dae4b3a9d030/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L427-L468]),
>  the {{failed-task-count}} JMX metric is not updated to reflect the task 
> failure, even though the status endpoints in the REST API do report the task 
> as failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable

2021-07-21 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-12851:
--

Assignee: Jose Armando Garcia Sancio

> Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
> ---
>
> Key: KAFKA-12851
> URL: https://issues.apache.org/jira/browse/KAFKA-12851
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: Capture.PNG
>
>
> Failed twice on a [PR 
> build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/]
> h3. Stacktrace
> org.opentest4j.AssertionFailedError: expected:  but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
> org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13104) Controller should notify the RaftClient when it resigns

2021-07-21 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-13104.
--
Resolution: Fixed

Fixed

> Controller should notify the RaftClient when it resigns
> ---
>
> Key: KAFKA-13104
> URL: https://issues.apache.org/jira/browse/KAFKA-13104
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Ryan Dielhenn
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> {code:java}
>   private Throwable handleEventException(String name,
>  Optional 
> startProcessingTimeNs,
>  Throwable exception) {
>   ...
>   renounce();
>   return new UnknownServerException(exception);
>   }
>  {code}
> When the active controller encounters an event exception it attempts to 
> renounce leadership. Unfortunately, this doesn't tell the {{RaftClient}} that 
> it should attempt to give up leadership. This will result in inconsistent 
> state with the {{RaftClient}} as leader but with the controller as inactive.
> We should change this implementation so that the active controller asks the 
> {{RaftClient}} to resign. The active controller waits until 
> {{handleLeaderChange}} before calling {{renounce()}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #11099: KAFKA-10542: Migrate KTable mapValues, passthrough, and source to new Processor API

2021-07-21 Thread GitBox


vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674135420



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -558,14 +550,14 @@ public final void addStateStore(final StoreBuilder 
storeBuilder,
 nodeGroups = null;
 }
 
-public final  void addGlobalStore(final StoreBuilder 
storeBuilder,
+public final  void addGlobalStore(final 
StoreBuilder storeBuilder,

Review comment:
   Huh, I was surprised to see this change, but I figured out it's because 
of the below test. That test seems to be abusing the source node processor, so 
let's roll back this change and fix that test.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
##
@@ -120,19 +123,19 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final Change change) {
-final V1 newValue = computeValue(key, change.newValue);
-final V1 oldValue = computeOldValue(key, change);
+public void process(final Record> record) {
+final VOut newValue = computeValue(record.key(), 
record.value().newValue);
+final VOut oldValue = computeOldValue(record.key(), 
record.value());
 
 if (queryableName != null) {
-store.put(key, ValueAndTimestamp.make(newValue, 
context().timestamp()));
-tupleForwarder.maybeForward(key, newValue, oldValue);
+store.put(record.key(), ValueAndTimestamp.make(newValue, 
record.timestamp()));
+tupleForwarder.maybeForward(record.key(), newValue, oldValue);

Review comment:
   During review, it struck me that we're just internally going to turn 
this back into a Record. We can update the internal API to accept the new 
forwarding type, but we don't need to tack that on to this PR.
   
   I filed https://issues.apache.org/jira/browse/KAFKA-13117 so we don't forget.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##
@@ -92,33 +97,36 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final V value) {
+public void process(final Record record) {
 // if the key is null, then ignore the record
-if (key == null) {
-LOG.warn(
+if (record.key() == null) {
+context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(
 "Skipping record due to null key. topic=[{}] 
partition=[{}] offset=[{}]",
-context().topic(), context().partition(), 
context().offset()
-);
+recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+));
 droppedRecordsSensor.record();
 return;
 }
 
 if (queryableName != null) {
-final ValueAndTimestamp oldValueAndTimestamp = 
store.get(key);
-final V oldValue;
+final ValueAndTimestamp oldValueAndTimestamp = 
store.get(record.key());
+final VIn oldValue;
 if (oldValueAndTimestamp != null) {
 oldValue = oldValueAndTimestamp.value();
-if (context().timestamp() < 
oldValueAndTimestamp.timestamp()) {
-LOG.warn("Detected out-of-order KTable update for {} 
at offset {}, partition {}.",
-store.name(), context().offset(), 
context().partition());
+if (record.timestamp() < oldValueAndTimestamp.timestamp()) 
{
+context.recordMetadata().ifPresent(recordMetadata ->
+LOG.warn(
+"Detected out-of-order KTable update for {} at 
offset {}, partition {}.",

Review comment:
   Also, this is not related to your change, but it occurs to me that we're 
missing some important information here, which would likely save people a lot 
of time. Can we add the new and old timestamps to the log messages?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##
@@ -92,33 +97,36 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final V value) {
+public void process(final Record record) {
 // if the key is null, then ignore the record
-if (key == null) {
-LOG.warn(
+if (record.key() == null) {
+context.recordMetadata().ifPresent(recordMetadata -> LOG.warn(

Review comment:

[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-21 Thread GitBox


hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674140433



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-scheduler.schedule("transactionalId-expiration", () => {
-  val now = time.milliseconds()
-  inReadLock(stateLock) {
-val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-  transactionMetadataCache.flatMap { case (_, entry) =>
-entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-  case Empty | CompleteCommit | CompleteAbort => true
-  case _ => false
-}
-}.filter { case (_, txnMetadata) =>
-  txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-}.map { case (transactionalId, txnMetadata) =>
-  val txnMetadataTransition = txnMetadata.inLock {
-txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+partitionId: Int,
+partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+val currentTimeMs = time.milliseconds()
+
+inReadLock(stateLock) {
+  val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+  replicaManager.getLogConfig(transactionPartition) match {
+case Some(logConfig) =>
+  val maxBatchSize = logConfig.maxMessageSize
+  val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+  lazy val recordsBuilder = MemoryRecords.builder(
+ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+TransactionLog.EnforcedCompressionType,
+TimestampType.CREATE_TIME,
+0L,
+maxBatchSize
+  )
+
+  partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+txnMetadata.inLock {
+  if (!shouldExpire(txnMetadata, currentTimeMs)) {
+true
+  } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, 
currentTimeMs, maxBatchSize)) {
+val transitMetadata = txnMetadata.prepareDead()
+expired += TransactionalIdCoordinatorEpochAndMetadata(
+  transactionalId,
+  partitionCacheEntry.coordinatorEpoch,
+  transitMetadata
+)
+true
+  } else {
+// If the batch is full, return false to end the search. Any 
remaining
+// transactionalIds eligible for expiration can be picked next 
time.
+false
   }
-  TransactionalIdCoordinatorEpochAndMetadata(transactionalId, 
entry.coordinatorEpoch, txnMetadataTransition)
 }
-  }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-
partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
   }
 
-val recordsPerPartition = transactionalIdByPartition
-  .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) 
=>
-val deletes: Array[SimpleRecord] = 
transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-  new SimpleRecord(now, 
TransactionLog.keyToBytes(entry.transactionalId), null)
-}.toArray
-val records = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-(topicPartition, records)
+  if (expired.isEmpty) {
+(Seq.empty, MemoryRecords.EMPTY)
+  } else {
+(expired, recordsBuilder.build())
   }
 
-def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
-  responses.forKeyValue { (topicPartition, response) =>
-inReadLock(stateLock) {
-  val toRemove = 
transactionalIdByPartition(topicPartition.partition)
-  transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
-toRemove.foreach { idCoordinatorEpochAndMetadata =>
-  val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
-  val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-  txnMetadata.inLock {
-if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
-  && txnMetadata.pendingState.contains(Dead)
-   

[GitHub] [kafka] hachikuji commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-21 Thread GitBox


hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674117013



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-scheduler.schedule("transactionalId-expiration", () => {
-  val now = time.milliseconds()
-  inReadLock(stateLock) {
-val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-  transactionMetadataCache.flatMap { case (_, entry) =>
-entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-  case Empty | CompleteCommit | CompleteAbort => true
-  case _ => false
-}
-}.filter { case (_, txnMetadata) =>
-  txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-}.map { case (transactionalId, txnMetadata) =>
-  val txnMetadataTransition = txnMetadata.inLock {
-txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+partitionId: Int,
+partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+val currentTimeMs = time.milliseconds()
+
+inReadLock(stateLock) {
+  val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+  replicaManager.getLogConfig(transactionPartition) match {
+case Some(logConfig) =>
+  val maxBatchSize = logConfig.maxMessageSize
+  val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+  lazy val recordsBuilder = MemoryRecords.builder(
+ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
   Filling a batch is an uncommon scenario, so I thought that allocating a 
full 1MB (default max.message.bytes) buffer each time the task ran seemed 
excessive. 16K seemed more reasonable for the common case. Another thought that 
I considered was a statically allocated  buffer, but that seemed like overkill.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11061: MINOR: Add test for verifying retention on changelog topics

2021-07-21 Thread GitBox


cadonna merged pull request #11061:
URL: https://github.com/apache/kafka/pull/11061


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11061: MINOR: Add test for verifying retention on changelog topics

2021-07-21 Thread GitBox


cadonna commented on pull request #11061:
URL: https://github.com/apache/kafka/pull/11061#issuecomment-884279103


   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11093: FIX: add serde configs to properly set serde

2021-07-21 Thread GitBox


cadonna commented on pull request #11093:
URL: https://github.com/apache/kafka/pull/11093#issuecomment-884274934


   Build ran into a timeout. I retriggered the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code

2021-07-21 Thread GitBox


jsancio commented on pull request #10775:
URL: https://github.com/apache/kafka/pull/10775#issuecomment-884274297


   @iakunin excuse for the delays reviewing this but a few of the committers 
and contributors have been busy getting 3.0 ready for release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8529) Flakey test ConsumerBounceTest#testCloseDuringRebalance

2021-07-21 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384947#comment-17384947
 ] 

Bruno Cadonna commented on KAFKA-8529:
--

Failed again:

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11093/4/testReport/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testCloseDuringRebalance___2/

{code:java}
org.opentest4j.AssertionFailedError: Rebalance did not complete in time ==> 
expected:  but was: 
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
at 
kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:400)
at 
kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:414)
at 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:381)
{code}

> Flakey test ConsumerBounceTest#testCloseDuringRebalance
> ---
>
> Key: KAFKA-8529
> URL: https://issues.apache.org/jira/browse/KAFKA-8529
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Boyang Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5450/consoleFull]
>  
> *16:16:10* kafka.api.ConsumerBounceTest > testCloseDuringRebalance 
> STARTED*16:16:22* kafka.api.ConsumerBounceTest.testCloseDuringRebalance 
> failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testCloseDuringRebalance.test.stdout*16:16:22*
>  *16:16:22* kafka.api.ConsumerBounceTest > testCloseDuringRebalance 
> FAILED*16:16:22* java.lang.AssertionError: Rebalance did not complete in 
> time*16:16:22* at org.junit.Assert.fail(Assert.java:89)*16:16:22* 
> at org.junit.Assert.assertTrue(Assert.java:42)*16:16:22* at 
> kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:402)*16:16:22*
>  at 
> kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:416)*16:16:22*
>  at 
> kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:379)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13117) After processors, migrate TupleForwarder and CacheFlushListener

2021-07-21 Thread John Roesler (Jira)
John Roesler created KAFKA-13117:


 Summary: After processors, migrate TupleForwarder and 
CacheFlushListener
 Key: KAFKA-13117
 URL: https://issues.apache.org/jira/browse/KAFKA-13117
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Currently, both of these interfaces take plain values in combination with 
timestamps:

CacheFlushListener:
{code:java}
void apply(K key, V newValue, V oldValue, long timestamp)
{code}
TimestampedTupleForwarder
{code:java}
 void maybeForward(K key,
   V newValue,
   V oldValue,
   long timestamp){code}
These are internally translated to the new PAPI, but after the processors are 
migrated, there won't be a need to have this translation. We should update both 
of these APIs to just accept {{Record>}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10546) KIP-478: Deprecate the old PAPI interfaces

2021-07-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10546.
--
Resolution: Fixed

> KIP-478: Deprecate the old PAPI interfaces
> --
>
> Key: KAFKA-10546
> URL: https://issues.apache.org/jira/browse/KAFKA-10546
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Can't be done until after the DSL internals are migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lct45 commented on pull request #11093: FIX: add serde configs to properly set serde

2021-07-21 Thread GitBox


lct45 commented on pull request #11093:
URL: https://github.com/apache/kafka/pull/11093#issuecomment-884232748


   @cadonna @ableegoldman is one of you able to merge? And potentially 
cherry-pick to the 3.0 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13116) KIP-724: Adjust system tests due to KAFKA-12944

2021-07-21 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13116:

Summary: KIP-724: Adjust system tests due to KAFKA-12944  (was: Adjust 
system tests due to KAFKA-12944)

> KIP-724: Adjust system tests due to KAFKA-12944
> ---
>
> Key: KAFKA-13116
> URL: https://issues.apache.org/jira/browse/KAFKA-13116
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Several system tests involving legacy message formats are failing due to 
> KAFKA-12944:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2021-07-21--001.system-test-kafka-trunk--1626872410--confluentinc--master–038bdaa4df/report.html
> All system tests that write data with legacy message formats need to use IBP 
> 2.8 or lower.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13116) Adjust system tests due to KAFKA-12944

2021-07-21 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-13116:
---

 Summary: Adjust system tests due to KAFKA-12944
 Key: KAFKA-13116
 URL: https://issues.apache.org/jira/browse/KAFKA-13116
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 3.0.0


Several system tests involving legacy message formats are failing due to 
KAFKA-12944:

http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2021-07-21--001.system-test-kafka-trunk--1626872410--confluentinc--master–038bdaa4df/report.html

All system tests that write data with legacy message formats need to use IBP 
2.8 or lower.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12418) Make sure it's ok not to include test jars in the release tarball

2021-07-21 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12418:

Fix Version/s: (was: 3.0.0)

> Make sure it's ok not to include test jars in the release tarball
> -
>
> Key: KAFKA-12418
> URL: https://issues.apache.org/jira/browse/KAFKA-12418
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Blocker
>
> As of [https://github.com/apache/kafka/pull/10203,] the release tarball no 
> longer includes includes test, sources, javadoc and test sources jars. These 
> are still published to the Maven Central repository.
> This seems like a good change and 3.0.0 would be a good time to do it, but 
> filing this JIRA to follow up and make sure before said release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12418) Make sure it's ok not to include test jars in the release tarball

2021-07-21 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-12418.
-
Resolution: Done

> Make sure it's ok not to include test jars in the release tarball
> -
>
> Key: KAFKA-12418
> URL: https://issues.apache.org/jira/browse/KAFKA-12418
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Blocker
>
> As of [https://github.com/apache/kafka/pull/10203,] the release tarball no 
> longer includes includes test, sources, javadoc and test sources jars. These 
> are still published to the Maven Central repository.
> This seems like a good change and 3.0.0 would be a good time to do it, but 
> filing this JIRA to follow up and make sure before said release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #11096: Adding reviewers.py to help tag reviewers in commit message

2021-07-21 Thread GitBox


ijuma commented on pull request #11096:
URL: https://github.com/apache/kafka/pull/11096#issuecomment-884179925


   Thanks, this is helpful. Any chance we can add a map that deduplicates 
emails from committers? In this PR, we could add the map and a couple of cases 
and then we can iterate as needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11094: MINOR: Improve usage of LogManager.currentDefaultConfig

2021-07-21 Thread GitBox


ijuma merged pull request #11094:
URL: https://github.com/apache/kafka/pull/11094


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11094: MINOR: Improve usage of LogManager.currentDefaultConfig

2021-07-21 Thread GitBox


ijuma commented on pull request #11094:
URL: https://github.com/apache/kafka/pull/11094#issuecomment-884178111


   Test failure is unrelated: 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


cadonna commented on pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#issuecomment-884117221


   Cherry-picked to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


cadonna merged pull request #11100:
URL: https://github.com/apache/kafka/pull/11100


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13115) doSend can be blocking

2021-07-21 Thread Ivan Vaskevych (Jira)
Ivan Vaskevych created KAFKA-13115:
--

 Summary: doSend can be blocking
 Key: KAFKA-13115
 URL: https://issues.apache.org/jira/browse/KAFKA-13115
 Project: Kafka
  Issue Type: Bug
Reporter: Ivan Vaskevych


https://github.com/apache/kafka/pull/11023



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#issuecomment-884060176


   @cadonna , thanks for comments. PR updated. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11098: KAFKA-13099; Transactional expiration should account for max batch size

2021-07-21 Thread GitBox


dajac commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r673818846



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-scheduler.schedule("transactionalId-expiration", () => {
-  val now = time.milliseconds()
-  inReadLock(stateLock) {
-val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-  transactionMetadataCache.flatMap { case (_, entry) =>
-entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-  case Empty | CompleteCommit | CompleteAbort => true
-  case _ => false
-}
-}.filter { case (_, txnMetadata) =>
-  txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-}.map { case (transactionalId, txnMetadata) =>
-  val txnMetadataTransition = txnMetadata.inLock {
-txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+partitionId: Int,
+partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+val currentTimeMs = time.milliseconds()
+
+inReadLock(stateLock) {
+  val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+  replicaManager.getLogConfig(transactionPartition) match {
+case Some(logConfig) =>
+  val maxBatchSize = logConfig.maxMessageSize
+  val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+  lazy val recordsBuilder = MemoryRecords.builder(
+ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
   For my own education, why do we use `16384` as a minimum here?

##
File path: 
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
##
@@ -629,6 +631,62 @@ class TransactionStateManagerTest {
 verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
+  @Test
+  def testTransactionExpirationShouldRespectBatchSize(): Unit = {
+val partitionIds = 0 until numPartitions
+val maxBatchSize = 512
+
+loadTransactionsForPartitions(partitionIds)
+
+val allTransactionalIds = mutable.Set.empty[String]
+for (i <- 0 to 1000) {
+  val txnlId = s"id_$i"
+  val producerId = i
+  val txnMetadata = transactionMetadata(txnlId, producerId)
+  txnMetadata.txnLastUpdateTimestamp = time.milliseconds() - 
txnConfig.transactionalIdExpirationMs
+  transactionManager.putTransactionStateIfNotExists(txnMetadata)
+  allTransactionalIds += txnlId
+}
+
+def removeExpiredTransactionalIds(): Map[TopicPartition, MemoryRecords] = {
+  EasyMock.reset(replicaManager)
+  expectLogConfig(partitionIds, maxBatchSize)
+
+  val appendedRecordsCapture = expectTransactionalIdExpiration(Errors.NONE)
+  EasyMock.replay(replicaManager)
+
+  transactionManager.removeExpiredTransactionalIds()
+  EasyMock.verify(replicaManager)
+
+  assertTrue(appendedRecordsCapture.hasCaptured)
+  appendedRecordsCapture.getValue
+}
+
+def hasUnexpiredTransactionalIds: Boolean = {
+  val unexpiredTransactions = 
transactionManager.listTransactionStates(Set.empty, Set.empty)
+.transactionStates.asScala
+  assertTrue(unexpiredTransactions.forall(txn => txn.transactionState == 
Empty.name))
+  unexpiredTransactions.nonEmpty
+}
+
+var iterations = 0
+val expiredTransactionalIds = mutable.Set.empty[String]
+while (hasUnexpiredTransactionalIds) {
+  removeExpiredTransactionalIds().forKeyValue { (_, records) =>
+assertTrue(records.sizeInBytes() < maxBatchSize)

Review comment:
   nit: We might be able to drop the `()` of `sizeInBytes` and `records` 
below.

##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-scheduler.schedule("transactionalId-expiration", () => {
-  val now = time.milliseconds()
-  inReadLock(stateLock) {
-val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-  transactionMetadataCache.flatMap { case (_, entry) =>
-entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-  case Empty | CompleteCommit | CompleteAbort => true
-  case _ => false
-}
-}.filter { case (_, txnMetadata) =>
-  txnMetadata.txnLastUpdateTimestamp <= now - 
confi

[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673833419



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3230,13 +3229,13 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.SlidingWindows;
 
 // A sliding time window with a time difference of 10 minutes and grace period 
of 30 minutes
-Duration timeDifferenceMs = Duration.ofMinutes(10);
-Duration gracePeriodMs = Duration.ofMinutes(30);
-SlidingWindows.withTimeDifferenceAndGrace(timeDifferenceMs,gracePeriodMs);
+Duration timeDifference = Duration.ofMinutes(10);
+Duration gracePeriod = Duration.ofMinutes(30);
+SlidingWindows.ofTimeDifferenceAndGrace(timeDifference, 
gracePeriod);
  
  Note
  Sliding windows require that you set 
a grace period, as shown above. For time windows and session windows,
- setting the grace period is optional and 
defaults to 24 hours.
+ setting the grace period is optional.

Review comment:
   You're right. Remove the note.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673833045



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3208,14 +3208,13 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.TimeWindows;
 
 // A tumbling time window with a size of 5 minutes (and, by definition, an 
implicit
-// advance interval of 5 minutes). Note the explicit grace period, as the 
current
-// default value is 24 hours, which may be larger than needed for smaller 
windows.
-Duration windowSizeMs = Duration.ofMinutes(5);
-Duration gracePeriodMs = Duration.ofMinutes(1);
-TimeWindows.of(windowSizeMs).grace(gracePeriodMs);
+// advance interval of 5 minutes).
+Duration windowSize = Duration.ofMinutes(5);
+Duration gracePeriod = Duration.ofMinutes(1);

Review comment:
   Make sense! Added 1 minute grace period comment:
   `A tumbling time window with a size of 5 minutes (and, by definition, an 
implicit advance interval of 5 minutes), and grace period of 1 minute.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna closed pull request #10944: MINOR: Loose verification of startup in EOS system tests

2021-07-21 Thread GitBox


cadonna closed pull request #10944:
URL: https://github.com/apache/kafka/pull/10944


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10944: MINOR: Loose verification of startup in EOS system tests

2021-07-21 Thread GitBox


cadonna commented on pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#issuecomment-884015976


   PR #11090 got merged. I will close this for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


cadonna commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673778552



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3208,14 +3208,13 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.TimeWindows;
 
 // A tumbling time window with a size of 5 minutes (and, by definition, an 
implicit
-// advance interval of 5 minutes). Note the explicit grace period, as the 
current
-// default value is 24 hours, which may be larger than needed for smaller 
windows.
-Duration windowSizeMs = Duration.ofMinutes(5);
-Duration gracePeriodMs = Duration.ofMinutes(1);
-TimeWindows.of(windowSizeMs).grace(gracePeriodMs);
+// advance interval of 5 minutes).
+Duration windowSize = Duration.ofMinutes(5);
+Duration gracePeriod = Duration.ofMinutes(1);

Review comment:
   Here I would either mention the grace period in the comment above or 
remove the grace period completely. Otherwise, it comes a bit as a surprise 
that a grace period is specified.

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3230,13 +3229,13 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.SlidingWindows;
 
 // A sliding time window with a time difference of 10 minutes and grace period 
of 30 minutes
-Duration timeDifferenceMs = Duration.ofMinutes(10);
-Duration gracePeriodMs = Duration.ofMinutes(30);
-SlidingWindows.withTimeDifferenceAndGrace(timeDifferenceMs,gracePeriodMs);
+Duration timeDifference = Duration.ofMinutes(10);
+Duration gracePeriod = Duration.ofMinutes(30);
+SlidingWindows.ofTimeDifferenceAndGrace(timeDifference, 
gracePeriod);
  
  Note
  Sliding windows require that you set 
a grace period, as shown above. For time windows and session windows,
- setting the grace period is optional and 
defaults to 24 hours.
+ setting the grace period is optional.

Review comment:
   That is not true anymore. More precisely, it is only true for deprecated 
methods, that we do not want to mention in the docs, right? Maybe you can even 
remove the note completely, since it explains a difference between sliding 
windows and other windows that does not exist anymore.

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3322,8 +3321,8 @@ KTable-KTable 
Foreign-Key

The key parts of this program are:

-   grace(ofMinutes(10))
-   This allows us to bound the lateness of 
events the window will accept.
+   
ofSizeAndGrace(Duration.ofHours(1), ofMinutes(10))
+   The ofMinutes(10) 
parameter allows us to bound the lateness of events the window will accept.

Review comment:
   ```suggestion
The specified grace period of 10 
minutes (i.e., the ofMinutes(10) argument) allows us to bound the 
lateness of events the window will accept.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673759493



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -204,7 +204,7 @@ public long size() {
  * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
  * @return this updated builder
  * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} instead
+ * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead
  */
 @Deprecated
 public JoinWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {

Review comment:
   We should use `ofTimeDifferenceAndGrace` for original `grace` method. 
Fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673735822



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -138,7 +138,7 @@ public static JoinWindows ofTimeDifferenceWithNoGrace(final 
Duration timeDiffere
  * @param timeDifference
  * @return a new JoinWindows object with the window definition with and 
grace period (default to 24 hours minus {@code timeDifference})
  * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead
+ * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)}} instead
  */
 @Deprecated
 public static JoinWindows of(final Duration timeDifference) throws 
IllegalArgumentException {

Review comment:
   We should use `ofTimeDifferenceWithNoGrace` for original `of` method. 
Fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#issuecomment-883965278


   @ableegoldman @mjsax @cadonna , please take a look. I think this PR should 
also merge into v3.0. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673733752



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -138,7 +138,7 @@ public static JoinWindows ofTimeDifferenceWithNoGrace(final 
Duration timeDiffere
  * @param timeDifference
  * @return a new JoinWindows object with the window definition with and 
grace period (default to 24 hours minus {@code timeDifference})
  * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, 
Duration)} instead
+ * @deprecated since 3.0 Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)}} instead

Review comment:
   We should use `ofTimeDifferenceWithNoGrace` for original `of` method. 
Fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673732882



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3208,14 +3208,13 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.TimeWindows;
 
 // A tumbling time window with a size of 5 minutes (and, by definition, an 
implicit
-// advance interval of 5 minutes). Note the explicit grace period, as the 
current
-// default value is 24 hours, which may be larger than needed for smaller 
windows.

Review comment:
   remove this note:
   `Note the explicit grace period, as the current
   default value is 24 hours, which may be larger than needed for smaller 
windows.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673731818



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3164,9 +3164,9 @@ KTable-KTable 
Foreign-Key
 
 // A hopping time window with a size of 5 minutes and an advance interval of 1 
minute.
 // The window's name -- the string parameter -- is used to e.g. name the 
backing state store.
-Duration windowSizeMs = Duration.ofMinutes(5);
-Duration advanceMs = Duration.ofMinutes(1);
-TimeWindows.of(windowSizeMs).advanceBy(advanceMs);
+Duration windowSize = Duration.ofMinutes(5);
+Duration advance = Duration.ofMinutes(1);
+TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance);

Review comment:
   Update the variable name here since these are not `Long` type to 
represent ms value. They are Duration type. 

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3164,9 +3164,9 @@ KTable-KTable 
Foreign-Key
 
 // A hopping time window with a size of 5 minutes and an advance interval of 1 
minute.
 // The window's name -- the string parameter -- is used to e.g. name the 
backing state store.
-Duration windowSizeMs = Duration.ofMinutes(5);
-Duration advanceMs = Duration.ofMinutes(1);
-TimeWindows.of(windowSizeMs).advanceBy(advanceMs);
+Duration windowSize = Duration.ofMinutes(5);
+Duration advance = Duration.ofMinutes(1);
+TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance);

Review comment:
   Update the variable name here since these are not `Long` type to 
represent ms value (as before). They are Duration type. 

##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3164,9 +3164,9 @@ KTable-KTable 
Foreign-Key
 
 // A hopping time window with a size of 5 minutes and an advance interval of 1 
minute.
 // The window's name -- the string parameter -- is used to e.g. name the 
backing state store.
-Duration windowSizeMs = Duration.ofMinutes(5);
-Duration advanceMs = Duration.ofMinutes(1);
-TimeWindows.of(windowSizeMs).advanceBy(advanceMs);
+Duration windowSize = Duration.ofMinutes(5);
+Duration advance = Duration.ofMinutes(1);
+TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance);

Review comment:
   Update the variable name here since these are not `Long` type to 
represent ms value (as before). They are Duration type now. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673731818



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3164,9 +3164,9 @@ KTable-KTable 
Foreign-Key
 
 // A hopping time window with a size of 5 minutes and an advance interval of 1 
minute.
 // The window's name -- the string parameter -- is used to e.g. name the 
backing state store.
-Duration windowSizeMs = Duration.ofMinutes(5);
-Duration advanceMs = Duration.ofMinutes(1);
-TimeWindows.of(windowSizeMs).advanceBy(advanceMs);
+Duration windowSize = Duration.ofMinutes(5);
+Duration advance = Duration.ofMinutes(1);
+TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance);

Review comment:
   These should be Duration type variable, not `Long` type to represent ms 
value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon commented on a change in pull request #11100:
URL: https://github.com/apache/kafka/pull/11100#discussion_r673730072



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -650,7 +650,7 @@ 

[GitHub] [kafka] showuon opened a new pull request #11100: MINOR: update doc to reflect the grace period change

2021-07-21 Thread GitBox


showuon opened a new pull request #11100:
URL: https://github.com/apache/kafka/pull/11100


   Found the issue while reading stream doc. We removed default 24 hours grace 
period in KIP-633, and deprecate some grace methods, but we forgot to update 
the stream docs.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org