[GitHub] [kafka] fantayeneh opened a new pull request #8576: format with correct syntax

2020-04-28 Thread GitBox


fantayeneh opened a new pull request #8576:
URL: https://github.com/apache/kafka/pull/8576


   small change fix string formatting
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] steverod commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


steverod commented on pull request #8542:
URL: https://github.com/apache/kafka/pull/8542#issuecomment-620893016


   retest this please
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-28 Thread GitBox


abbccdda commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r416879514



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   I see, makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416916295



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {
-private long acceptableRecoveryLag = 100L;
-private int balanceFactor = 1;
-private int maxWarmupReplicas = 2;
-private int numStandbyReplicas = 0;
-private long probingRebalanceInterval = 60 * 1000L;
-
-private Map clientStates = new HashMap<>();
-private Set allTasks = new HashSet<>();
-private Set statefulTasks = new HashSet<>();
-
-private ClientState client1;
-private ClientState client2;
-private ClientState client3;
-
-private HighAvailabilityTaskAssignor taskAssignor;
-
-private void createTaskAssignor() {
-final AssignmentConfigs configs = new AssignmentConfigs(
-acceptableRecoveryLag,
-balanceFactor,
-maxWarmupReplicas,
-numStandbyReplicas,
-probingRebalanceInterval
-);
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-configs);
-}
+private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 0,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
+
+private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 1,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
 
-@Test
-public void 
shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
-client1 = EasyMock.createNiceMock(ClientState.class);
-expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
-expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
-replay(client1);
-allTasks =  mkSet(TASK_0_0, TASK_0_1);
-clientStates = singletonMap(UUID_1, client1);
-createTaskAssignor();
 
-assertFalse(taskAssignor.previousAssignmentIsValid());

Review comment:
   Since you have a follow-on PR that touches this method, I'll leave it 
alone and just proceed to merge. We should consider both of these options in 
the follow-on.
   
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r416969713



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
##
@@ -101,8 +102,8 @@ public void before() throws Exception {
 builder = new StreamsBuilder();
 createTopics();
 streamsConfiguration = new Properties();
-final String applicationId = "global-thread-shutdown-test" + 
testName.getMethodName();
-streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);

Review comment:
   I've standardized all the usages to be just "app", followed by the 
generated name, since the generated name contains the same information that we 
previously hand-wrote into the prefix or suffix. All we really need to do is 
ensure that the app id won't collide with a group name that we might use in a 
verification consumer, for example. For that reason, I've never used the 
generated name "plain", but always scoped it to the usage (app id, group id, 
input topic, etc.).
   
   It's not super important to apply these ideas universally, but I felt it 
would make it easier to write more tests like it in the future if I just made a 
full pass on all the tests to make them all look the same.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -88,16 +89,17 @@
 private String stateStoreName;
 
 @Rule
-public TestName name = new TestName();
+public TestName testName = new TestName();
 
 @Before
 public void before() {
-inputTopicName = "input-topic-" + name.getMethodName();
-outputTopicName = "output-topic-" + name.getMethodName();
-stateStoreName = "lagfetch-test-store" + name.getMethodName();
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+inputTopicName = "input-topic-" + safeTestName;
+outputTopicName = "output-topic-" + safeTestName;
+stateStoreName = "lagfetch-test-store" + safeTestName;
 
 streamsConfiguration = new Properties();
-streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"lag-fetch-" + name.getMethodName());
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"lag-fetch-" + safeTestName);

Review comment:
   ```suggestion
   streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" 
+ safeTestName);
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final 
EmbeddedKafkaCluster cluster, fina
 }
 }
 
-public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, 
final KafkaStreams driver) {
-driver.cleanUp();
+public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster 
cluster, final KafkaStreams driver) {
 try {
+driver.cleanUp();
 cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
+} catch (final RuntimeException | InterruptedException e) {
+LOG.warn("Ignoring failure to clean test state", e);
 }

Review comment:
   This is really the fix for KAFKA-9875. The other change just hopefully 
reduces the probability that ignoring the exceptions could cause subsequent 
failures (e.g., if the topics don't get deleted before the next test, at least 
the next one will have different topic names).
   
   I've verified that all usages of this method are ok to ignore potential 
exceptions. Namely, as long as the test logic itself doesn't want to ensure 
that any topics got deleted, and as long as this method is the last line in the 
method, then it should be fine just to ignore failures here.
   
   I also considered just deleting the method, but if it does succeed, then it 
leaves less garbage around for subsequent tests, so it feels better to at least 
attempt a cleanup.

##
File path: streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
##
@@ -106,7 +106,9 @@ public static void 
startKafkaStreamsAndWaitForRunningState(final KafkaStreams ka
 kafkaStreams.start();
 assertThat(
 "KafkaStreams did not transit to RUNNING state within " + 
timeoutMs + " milli seconds.",
-countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), 
equalTo(true));
+countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS),
+equalTo(true)
+);

Review comment:
   just fixing the formatting.





[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#issuecomment-620847306


   I looked at the three failed tests:
   
   * `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` is 
actually due to the issue that https://github.com/apache/kafka/pull/8548 tried 
to fix. Waiting for @vvcephei to review 8548
   * 
`EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]`
 is being looked at by @mjsax as KAFKA-9831
   * 
`GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]`
 is a new issue, I created KAFKA-9928 for this, and my gut feeling is that it 
has the same root cause as KAFKA-9831. (also cc @mjsax )
   
   So I think this PR is good to be merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fantayeneh opened a new pull request #8577: use appropriate fn for readability. (maybe)

2020-04-28 Thread GitBox


fantayeneh opened a new pull request #8577:
URL: https://github.com/apache/kafka/pull/8577


   using the min, max might make the code a little easier to read. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416944392



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final 
EmbeddedKafkaCluster cluster, final
  * @param  Key type of the data records
  * @param  Value type of the data records
  */
-@SuppressWarnings("WeakerAccess")
 public static  void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
  final 
Collection> records,
  final 
Properties producerConfig,
  final 
Headers headers,
  final 
Long timestamp,
- final 
boolean enableTransactions)
-throws ExecutionException, InterruptedException {
+ final 
boolean enableTransactions) {
 
 try (final Producer producer = new 
KafkaProducer<>(producerConfig)) {
 if (enableTransactions) {
 producer.initTransactions();
 producer.beginTransaction();
 }
 for (final KeyValue record : records) {
-final Future f = producer.send(
-new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
-f.get();
+producer.send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value, headers));

Review comment:
   Previously we wait after sending each record, here we only wait once 
after sending all records, so it is more efficient.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416943907



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
##
@@ -337,8 +336,11 @@ public void 
shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
 TestUtils.waitForCondition(
 () -> {
 try {
-final ReadOnlyKeyValueStore store =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore store = 
IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+if (store == null)
+return false;

Review comment:
   ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on a change in pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#discussion_r416866064



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -220,6 +220,8 @@ public void stop() {
 
 workerMetricsGroup.close();
 connectorStatusMetricsGroup.close();
+
+workerConfigTransformer.close();

Review comment:
   Looking at the initialization of `workerConfigTransformer` I see it 
should be made final. 
   
   And then I notice that this is the case for 
`connectorClientConfigOverridePolicy` and all the class members of 
`ConnectorStatusMetricsGroup`. @tombentley do you mind tightening these types 
as well?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
##
@@ -98,4 +101,8 @@ public void onCompletion(Throwable error, Void result) {
 HerderRequest request = worker.herder().restartConnector(ttl, 
connectorName, cb);
 connectorRequests.put(path, request);
 }
+
+public void close() {

Review comment:
   should we also change this class to implement `AutoCloseable`? 
   This can't be used immediately in a try-with-resources clause, but probably 
better to signal the existence of this method at the class level. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416947800



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final 
EmbeddedKafkaCluster cluster, final
  * @param  Key type of the data records
  * @param  Value type of the data records
  */
-@SuppressWarnings("WeakerAccess")
 public static  void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
  final 
Collection> records,
  final 
Properties producerConfig,
  final 
Headers headers,
  final 
Long timestamp,
- final 
boolean enableTransactions)
-throws ExecutionException, InterruptedException {
+ final 
boolean enableTransactions) {
 
 try (final Producer producer = new 
KafkaProducer<>(producerConfig)) {
 if (enableTransactions) {
 producer.initTransactions();
 producer.beginTransaction();
 }
 for (final KeyValue record : records) {
-final Future f = producer.send(
-new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
-f.get();
+producer.send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value, headers));

Review comment:
   Thanks. That's what I was asking for confirmation on. I realize now the 
structure of my sentence was ambiguous.
   
   I agree that the method contract is that the batch should be synchronously 
produced, not that each record should be synchronously produced, so this change 
looks good to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


junrao commented on pull request #8542:
URL: https://github.com/apache/kafka/pull/8542#issuecomment-620880304


   @steverod : Does the JDK 8 and Scala 2.12 tests pass for you locally? Not 
sure why the jenkins test failed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe edited a comment on pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


cmccabe edited a comment on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289


   > In addition to block-level read/write, would there be a benefit to expose 
file system read/write metrics?
   
   It's better to have that discussion on the mailing list.  This PR is just 
about KIP-551.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


cmccabe commented on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289


   > In addition to block-level read/write, would there be a benefit to expose 
file system read/write metrics?
   It's better to have that discussion on the mailing list.  This PR is just 
about KIP-551.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei opened a new pull request #8578:
URL: https://github.com/apache/kafka/pull/8578


   The ticket is for a flaky test that failed to clean up topics _after_ the 
test, which
   isn't strictly necessary for test success.
   
   * alter the "clean up after test" method to never throw an exception
 (after verifying it's always the last invocation inside a finally block,
 so it won't break any test semantics)
   * consolidated the naming of all integration tests' app ids, topics, etc., 
by introducing 
 a new test utility to generate safe, unique, descriptive names.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap commented on pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-28 Thread GitBox


gwenshap commented on pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#issuecomment-620826970


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416942992



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc
 }
 
 private void verifyStateStore(final KafkaStreams streams,
-  final Set> 
expectedStoreContent) {
-ReadOnlyKeyValueStore store = null;
-
-final long maxWaitingTime = System.currentTimeMillis() + 30L;
-while (System.currentTimeMillis() < maxWaitingTime) {
-try {
-store = 
streams.store(StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore()));
-break;
-} catch (final InvalidStateStoreException okJustRetry) {
-try {
-Thread.sleep(5000L);
-} catch (final Exception ignore) { }
-}
-}
-
+  final Set> 
expectedStoreContent) throws InterruptedException {
+final ReadOnlyKeyValueStore store = IntegrationTestUtils
+.getStore(30L, storeName, streams, 
QueryableStoreTypes.keyValueStore());

Review comment:
   Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-28 Thread GitBox


kkonstantine commented on a change in pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#discussion_r417055199



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();
+long lastConsumedOffset = consumer.position(topicPartition);
+if (lastConsumedOffset >= endOffset) {
+log.trace("Reached end offset {} for {}", endOffset, 
topicPartition);
 it.remove();
-else {
+} else {
+log.trace(
+"Behind end offset {} for {}; last-consumed offset is 
{}",
+endOffset,
+topicPartition,
+lastConsumedOffset);

Review comment:
   ```suggestion
   log.trace("Behind end offset {} for {}; last-consumed 
offset is {}",
   endOffset, topicPartition, lastConsumedOffset);
   ```
   nit: multiline calls don't need to be on their own line in AK and tab is 
equal to 4 spaces (here we need 2 tabs)

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();

Review comment:
   unboxing will happen in the comparison in the `if` branch anyways, so 
probably better to do it early declaring the type `long` here. 
   
   

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();
+long lastConsumedOffset = consumer.position(topicPartition);
+if (lastConsumedOffset >= endOffset) {
+log.trace("Reached end offset {} for {}", endOffset, 
topicPartition);

Review comment:
   given that the previous messages say "Reading to ..." maybe it would 
make sense to say:
   ```suggestion
   log.trace("Read to end offset {} for {}", endOffset, 
topicPartition);
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();
+long lastConsumedOffset = consumer.position(topicPartition);
+if (lastConsumedOffset >= endOffset) {
+log.trace("Reached end offset {} for {}", endOffset, 
topicPartition);
 it.remove();
-else {
+} else {
+log.trace(
+"Behind end offset {} for {}; last-consumed offset is 
{}",
+endOffset,
+topicPartition,
+lastConsumedOffset);

Review comment:
   Similar to the above, seeing a message that says `read` might be easier 
to read in context than `consumed`. 
   How about: 
   `Behind end offset {} for {}; last-read offset is {}`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-28 Thread GitBox


C0urante commented on pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#issuecomment-620992508


   Thanks @kkonstantine! These all seem like reasonable suggestions and I've 
applied them all. Ready for the next round when you have time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-28 Thread GitBox


C0urante commented on a change in pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#discussion_r417070806



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -281,9 +281,18 @@ private void readToLogEnd() {
 Iterator> it = 
endOffsets.entrySet().iterator();
 while (it.hasNext()) {
 Map.Entry entry = it.next();
-if (consumer.position(entry.getKey()) >= entry.getValue())
+TopicPartition topicPartition = entry.getKey();
+Long endOffset = entry.getValue();

Review comment:
   Fair enough; added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-28 Thread GitBox


vvcephei commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-620847005


   Whew! System tests passed: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-04-28--001.1588064884--ConcurrencyPractitioner--EMIT-ON-CHANGE--ddbf2cf/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#issuecomment-620879654


   Merged to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #8582: KAFKA-9932: Don't load configs from ZK when the log has already been loaded

2020-04-28 Thread GitBox


ijuma opened a new pull request #8582:
URL: https://github.com/apache/kafka/pull/8582


   If a broker contains 8k replicas, we would previously issue 8k ZK calls to 
retrieve topic
   configs when processing the first LeaderAndIsr request. That should 
translate to 0 after
   these changes.
   
   Credit to @junrao for identifying the problem.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap commented on pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-28 Thread GitBox


gwenshap commented on pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#issuecomment-620932278


   No failures, nice :)
   Great update @lbradstreet and thanks for contributing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8580: KAFKA-9832: fix attempt to commit non-running tasks

2020-04-28 Thread GitBox


vvcephei commented on pull request #8580:
URL: https://github.com/apache/kafka/pull/8580#issuecomment-620962357


   Hey @mjsax , do you have time for a quick review?
   
   This bug seems to have been introduced by 
https://github.com/apache/kafka/pull/8440/files#r407722022 , which attempts to 
commit all non-corrupted tasks. Some of these tasks may not be running. The 
Task implementations will throw an exception if we attempt to `prepareCommit` 
while not in state RUNNING (or RESTORING).
   
   We could make the task more permissive, so that it would ignore the commit 
to a task that is not in a committable state. I opted instead to filter out 
only the tasks in committable states, though. I was concerned that if we make 
prepareCommit more permissive, we might just complicate the rest of the commit 
lifecycle, because then the rest of it would also have to be permissive, etc.
   
   Thanks for the very nice test in your prior PR; it was easy to extend it to 
cover this case and also to add the regression test.
   
   WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8442: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8442:
URL: https://github.com/apache/kafka/pull/8442#issuecomment-620964908


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8442: KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8442:
URL: https://github.com/apache/kafka/pull/8442#issuecomment-620964776


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.

2020-04-28 Thread GitBox


ijuma commented on a change in pull request #8579:
URL: https://github.com/apache/kafka/pull/8579#discussion_r417044882



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -382,6 +382,11 @@ abstract class AbstractFetcherThread(name: String,
 "that the partition is being moved")
   partitionsWithError += topicPartition
 
+case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+  warn(s"Remote broker does not host the partition 
$topicPartition, which could indicate " +
+"that the partition is being created or deleted.")

Review comment:
   Should it be `info` if we think this is expected? That would still show 
in the logs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417044713



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final 
EmbeddedKafkaCluster cluster, fina
 }
 }
 
-public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, 
final KafkaStreams driver) {
-driver.cleanUp();
+public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster 
cluster, final KafkaStreams driver) {
 try {
+driver.cleanUp();
 cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
+} catch (final RuntimeException | InterruptedException e) {
+LOG.warn("Ignoring failure to clean test state", e);
 }

Review comment:
   I share your concern, but I'm not sure about the conclusion.
   
   Yes, if there is state (such as a topic) that leaks from one test to the 
next, it can certainly cause difficult-to-debug failures. However, there are 
multiple things we can do to prevent/mitigate it:
   * delete state after tests (not to leave any garbage behind)
   * delete state before the tests (to ensure a clean slate for the test)
   * choose unique names for all resources of each test (this is where the 
other part of this PR comes in)
   
   Any one of these should be sufficient to prevent state from leaking in 
between tests, and most of these tests do all three. In other words, we have 3x 
redundancy guarding against such test pollution. If you look at all three of 
these measures, the clean up _after_ tests is actually the most optional, since 
tests can't tolerate failures in the clean up _before_ (because it also creates 
necessary topics), and choosing unique topic names per test is bulletproof and 
easy to fix (once we know what the problem is).
   
   Whether the cleanup is part of the test or in the `@After` method, the 
outcome is the same, if the method throws an exception, the test will fail. The 
downside of After is that it requires you to store the topic names in mutable 
class-level fields, which actually makes it more awkward to choose unique names 
per test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417015562



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##
@@ -215,13 +216,13 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 }
 
 private Properties streamsConfiguration() {
-final String applicationId = "streamsApp";
+final String safeTestName = safeUniqueTestName(getClass(), testName);
 final Properties config = new Properties();
 config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
-config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + 
name.getMethodName());
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
 config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 
(++port));
 config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
-config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());

Review comment:
   Is it safer to encode the appID as part of the dir path to avoid 
collision?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final 
EmbeddedKafkaCluster cluster, fina
 }
 }
 
-public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, 
final KafkaStreams driver) {
-driver.cleanUp();
+public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster 
cluster, final KafkaStreams driver) {
 try {
+driver.cleanUp();
 cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
+} catch (final RuntimeException | InterruptedException e) {
+LOG.warn("Ignoring failure to clean test state", e);
 }

Review comment:
   req: Actually deleting topics after test is critical for some tests: 
I've encountered some cases where the same topics are reused mistakenly across 
different test cases within the single class. But I feel that it is better to 
put the topic deletion in the `@after` function while leaving `cleanUp()` as 
part of the test function itself.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
##
@@ -156,7 +157,7 @@ public void 
shouldPreservePartitionTimeOnKafkaStreamRestart() {
 assertThat(lastRecordedTimestamp, is(5000L));
 } finally {
 kafkaStreams.close();
-cleanStateAfterTest(CLUSTER, kafkaStreams);
+quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);

Review comment:
   nit: we can put kafkaStreams in a try block.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -106,7 +108,7 @@ public void before() {
 
 consumerConfiguration = new Properties();
 
consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
name.getMethodName() + "-consumer");
+consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
safeTestName + "-consumer");

Review comment:
   Somewhere else it is set as `"group-" + safeTestName`, is this change 
intentional?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
##
@@ -243,7 +251,7 @@ public void shouldRecoverBufferAfterShutdown() {
 
 } finally {
 driver.close();
-cleanStateAfterTest(CLUSTER, driver);
+quietlyCleanStateAfterTest(CLUSTER, driver);

Review comment:
   nit: ditto here, we can put `driver` in the try block. And ditto 
elsewhere.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#issuecomment-620971604


   JDK8 build failed on a relevant test: 
   
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2007/testReport/junit/org.apache.kafka.connect.runtime/WorkerTest/testStartAndStopConnector/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #8581: MINOR: Fix typo and rephrase content in docs

2020-04-28 Thread GitBox


showuon opened a new pull request #8581:
URL: https://github.com/apache/kafka/pull/8581


   1. fix typo: `atleast` -> `at least`
   2. add missing `--` to be consistent
   3. rephrase a sentence, to make it more clear:
   
   before: `LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a 
newer version) with the G1 collector`
   
   It will misguide the users to use JDK 1.8 u5, while the JDK 1.8 u251 is 
already released, which will include many important bug fixes. I did some 
rephrase as below:
   
   after: `At the time when we write this, LinkedIn is running JDK 1.8 u5 
(looking to upgrade to a newer version) with the G1 collector`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


ableegoldman commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620909428


   One unrelated failure: 
`MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-04-28 Thread GitBox


ning2008wisc commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r416995009



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
 Checkpoint.unwrapGroup(record.sourcePartition()),
 System.currentTimeMillis() - record.timestamp());
 }
+
+private void refreshIdleConsumerGroupOffset() {
+Map> consumerGroupsDesc 
= targetAdminClient
+.describeConsumerGroups(consumerGroups).describedGroups();
+
+for (String group : consumerGroups) {
+try {
+if (consumerGroupsDesc.get(group) == null) {
+// if consumerGroupsDesc does not contain this group, it 
should be the new consumer
+// group created at source cluster and its offsets should 
be sync-ed to target
+newConsumerGroup.add(group);
+continue;
+}
+ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+// sync offset to the target cluster only if the state of 
current consumer group is idle or dead
+ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || 
consumerGroupState.equals(ConsumerGroupState.DEAD)) {
+idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+.partitionsToOffsetAndMetadata().get().entrySet());
+}
+} catch (InterruptedException | ExecutionException e) {
+log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+}
+}
+}
+
+Map> syncGroupOffset() {
+Map> offsetToSyncAll = 
new HashMap<>();
+
+// first, sync offsets for the idle consumers at target
+for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) {
+String consumerGroupId = group.getKey();
+// for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+// from the pre-populated map
+Map convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+if (convertedUpstreamOffset == null) continue;
+
+Map offsetToSync = new 
HashMap<>();
+for (Entry entry : 
group.getValue()) {

Review comment:
   If I am understanding right, are you asking about this scenario: 
consumer A is consuming from Topic `x` and ` y` and MM is replicating the 
offset of consumer A for Topic `x` and `y`. What if consumer A starts consume 
from Topic `x`, `y` and `z` where `z` is a new topic, why MM does not replicate 
the offset of consumer A for Topic `z`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

2020-04-28 Thread GitBox


zhaohaidao commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-620915590


   @abbccdda Hi, pr updated. Could you continue to review it?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417040202



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##
@@ -215,13 +216,13 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 }
 
 private Properties streamsConfiguration() {
-final String applicationId = "streamsApp";
+final String safeTestName = safeUniqueTestName(getClass(), testName);
 final Properties config = new Properties();
 config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
-config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + 
name.getMethodName());
+config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
 config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 
(++port));
 config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
-config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(applicationId).getPath());
+config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());

Review comment:
   I wavered on this point, but each time you call `tempDirectory`, it 
should give you a completely new directory:
   ```
* Create a temporary relative directory in the default temporary-file 
directory with the given prefix.
*
* @param prefix The prefix of the temporary directory, if null using 
"kafka-" as default prefix
   ```
   
   So the prefix seems to be nice only for documenting which test a directory 
is for, not for enforcing any kind of test/directory uniqueness. I felt like it 
added more noise than value, so I just dropped all the prefixes. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417046624



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
##
@@ -156,7 +157,7 @@ public void 
shouldPreservePartitionTimeOnKafkaStreamRestart() {
 assertThat(lastRecordedTimestamp, is(5000L));
 } finally {
 kafkaStreams.close();
-cleanStateAfterTest(CLUSTER, kafkaStreams);
+quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);

Review comment:
   Unfortunately, we generally can't use try-with-resources for these 
tests, since that makes the `kafkaStreams` reference out of scope for the 
finally block. We'd have to allow a reference to kafkaStreams to escape the try 
{} block to reference it either in finally or in an After method, which is just 
as messy as it currently is, if not more. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

2020-04-28 Thread GitBox


vvcephei commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-620971180


   Thanks, @arkins ! Shame is a powerful motivator :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#issuecomment-620970911


   Thanks for the review, @guozhangwang . I've addressed (or responded to) your 
comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620976620


   Cherry-picked to 2.5 as 9e2785fd1ba0ed16604e01058bae6b60ff9f3d96



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r417047054



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -106,7 +108,7 @@ public void before() {
 
 consumerConfiguration = new Properties();
 
consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
name.getMethodName() + "-consumer");
+consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
safeTestName + "-consumer");

Review comment:
   ```suggestion
   consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
   ```
   
   No, I just got tired of messing with every tests' idiosyncrasies.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620973163


   Cherry-pick for 2.5 in progress...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-04-28 Thread GitBox


bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r417002126



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
 Checkpoint.unwrapGroup(record.sourcePartition()),
 System.currentTimeMillis() - record.timestamp());
 }
+
+private void refreshIdleConsumerGroupOffset() {
+Map> consumerGroupsDesc 
= targetAdminClient
+.describeConsumerGroups(consumerGroups).describedGroups();
+
+for (String group : consumerGroups) {
+try {
+if (consumerGroupsDesc.get(group) == null) {
+// if consumerGroupsDesc does not contain this group, it 
should be the new consumer
+// group created at source cluster and its offsets should 
be sync-ed to target
+newConsumerGroup.add(group);
+continue;
+}
+ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+// sync offset to the target cluster only if the state of 
current consumer group is idle or dead
+ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || 
consumerGroupState.equals(ConsumerGroupState.DEAD)) {
+idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+.partitionsToOffsetAndMetadata().get().entrySet());
+}
+} catch (InterruptedException | ExecutionException e) {
+log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+}
+}
+}
+
+Map> syncGroupOffset() {
+Map> offsetToSyncAll = 
new HashMap<>();
+
+// first, sync offsets for the idle consumers at target
+for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) {
+String consumerGroupId = group.getKey();
+// for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+// from the pre-populated map
+Map convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+if (convertedUpstreamOffset == null) continue;
+
+Map offsetToSync = new 
HashMap<>();
+for (Entry entry : 
group.getValue()) {

Review comment:
   Yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] efeg opened a new pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.

2020-04-28 Thread GitBox


efeg opened a new pull request #8579:
URL: https://github.com/apache/kafka/pull/8579


   When does UnknownTopicOrPartitionException typically occur?
* Upon a topic creation, a follower broker of a new partition starts 
replica fetcher before the prospective leader broker of the new partition 
receives the leadership information from the controller (see 
[KAFKA-6221](https://issues.apache.org/jira/browse/KAFKA-6221)).
* Upon a topic deletion, a follower broker of a to-be-deleted partition 
starts replica fetcher after the leader broker of the to-be-deleted partition 
processes the deletion information from the controller.
* As expected, clusters with frequent topic creation and deletion report 
UnknownTopicOrPartitionException with relatively higher frequency.
   
   What is the impact?
* Exception tracking systems identify the error logs with 
UnknownTopicOrPartitionException as an exception. This results in a lot of 
noise for a transient issue that is expected to recover by itself and a natural 
process in Kafka due to its asynchronous state propagation.
   
   Why not move it to a lower than warn-level log?
* Despite typically being a transient issue, 
UnknownTopicOrPartitionException may also indicate real issues if it doesn't 
fix itself after a short period of time. To ensure detection of such scenarios, 
we set the log level to warn.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8580: KAFKA-9832: fix attempt to commit non-running tasks

2020-04-28 Thread GitBox


vvcephei opened a new pull request #8580:
URL: https://github.com/apache/kafka/pull/8580


   Fixes an attempt to commit potentially non-running tasks while recovering 
from task corruption.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8364: MINOR: Partition is under reassignment when adding and removing

2020-04-25 Thread GitBox


ijuma commented on pull request #8364:
URL: https://github.com/apache/kafka/pull/8364#issuecomment-619439283







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #7862: KAFKA-9246:Update Heartbeat timeout when ConsumerCoordinator commit offset

2020-04-25 Thread GitBox


guozhangwang commented on pull request #7862:
URL: https://github.com/apache/kafka/pull/7862#issuecomment-619456093


   cc @hachikuji .
   
   @dengziming could you rebase the current PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-25 Thread GitBox


C0urante opened a new pull request #8554:
URL: https://github.com/apache/kafka/pull/8554


   [Jira ticket](https://issues.apache.org/jira/browse/KAFKA-9919)
   
   Just some simple logging additions that should make life easier when the 
worker can't get caught up to the end of an internal topic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8103: KAFKA-7061: KIP-280 Enhanced log compaction

2020-04-25 Thread GitBox


guozhangwang commented on a change in pull request #8103:
URL: https://github.com/apache/kafka/pull/8103#discussion_r415156119



##
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##
@@ -109,6 +109,19 @@
 public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a 
message will remain " +
 "ineligible for compaction in the log. Only applicable for logs that 
are being compacted.";
 
+public static final String COMPACTION_STRATEGY_CONFIG = 
"compaction.strategy";
+public static final String COMPACTION_STRATEGY_DOC = "The retention 
strategy to use when compacting the log. " +
+   "Only applicable for logs that are being compacted. " + 
+   "By default the compaction strategy is set to \"offset\" where the 
latest offset for the key is retained. " + 
+   "For \"header\" strategy, the value provided by the producer in the 
record header will be used to determine " +
+   "the latest record for the key. For \"timestamp\" strategy, the record 
tiemstamp will be used to determine the " +
+   "latest record for the key. So setting the strategy to anything other 
than \"offset\" will replace the offset " +
+   "when calculating which records to retain for the value (i.e. provided 
by the producer) matching " + 
+   "the given strategy name (case-insensitive). The valid strategies are 
\"offset\", \"timestamp\" and \"header\".";
+
+public static final String COMPACTION_STRATEGY_HEADER_KEY_CONFIG = 
"compaction.strategy.header";
+public static final String COMPACTION_STRATEGY_HEADER_KEY_DOC = "The 
header key for the compaction. Only applicable for compaction strategy header.";

Review comment:
   nit: ... for "header" compaction strategy.

##
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##
@@ -109,6 +109,19 @@
 public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a 
message will remain " +
 "ineligible for compaction in the log. Only applicable for logs that 
are being compacted.";
 
+public static final String COMPACTION_STRATEGY_CONFIG = 
"compaction.strategy";
+public static final String COMPACTION_STRATEGY_DOC = "The retention 
strategy to use when compacting the log. " +
+   "Only applicable for logs that are being compacted. " + 
+   "By default the compaction strategy is set to \"offset\" where the 
latest offset for the key is retained. " + 
+   "For \"header\" strategy, the value provided by the producer in the 
record header will be used to determine " +
+   "the latest record for the key. For \"timestamp\" strategy, the record 
tiemstamp will be used to determine the " +
+   "latest record for the key. So setting the strategy to anything other 
than \"offset\" will replace the offset " +

Review comment:
   git: The sentence "So setting the strategy ... matching the given 
strategy name" reads a bit confusing here. I think we only need to state that 
when the change of the policy would take effects (the next time when compaction 
is triggered by the log cleaner thread), and emphasize that for "timestamp" and 
"header" we would always still retain the last record.

##
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
 
   /**
* The number of bytes of space each entry uses (the number of bytes in the 
hash plus an 8 byte offset)
+   * This evaluates to the number of bytes in the hash plus 8 bytes for the 
offset
+   * and, if applicable, another 8 bytes for non-offset compact strategy (set 
in the init method).
*/
-  val bytesPerEntry = hashSize + 8
-  
+  var bytesPerEntry = hashSize + longByteSize
+
   /**
* The maximum number of entries this map can contain
*/
-  val slots: Int = memory / bytesPerEntry
+  var slots: Int = memory / bytesPerEntry
+
+  /* compact strategy */
+  private var compactionStrategy: CompactionStrategy = null
+
+  /* header key for the Strategy header to look for */
+  private var headerKey: String = ""
+
+  /**
+   * Initialize the map with the topic compact strategy
+   * @param strategy The compaction strategy
+   * @param headerKey The header key if the compaction strategy is set to 
header
+   * @param cleanerThreadId The clenaer thread id
+   * @param topicPartitionName The topic partition name
+   */
+  override def init(strategy: String = Defaults.CompactionStrategy, headerKey: 
String = "", cleanerThreadId: Int = -1, topicPartitionName: String = "") {
+// set the log indent for the topic partition
+this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+info(s"Initializing OffsetMap with compaction strategy '$strategy' and 
header key '$headerKey'")
+
+// Change the salt used for key hashing making all existing keys 
unfindable.
+this.entries = 0
+

[GitHub] [kafka] cmccabe commented on pull request #8528: System tests should use --bootstrap-server rather than --zookeeper when testing new Kafka versions

2020-04-25 Thread GitBox


cmccabe commented on pull request #8528:
URL: https://github.com/apache/kafka/pull/8528#issuecomment-619440794


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap commented on pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-25 Thread GitBox


gwenshap commented on pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#issuecomment-619456920


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8554: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd

2020-04-25 Thread GitBox


C0urante commented on pull request #8554:
URL: https://github.com/apache/kafka/pull/8554#issuecomment-619449604


   @ncliang, @gharris1727, @aakashnshah would you mind taking a look when you 
have a chance?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on pull request #8555: KAFKA-9920:Fix NetworkDegradeTest.test_rate test error

2020-04-25 Thread GitBox


jiameixie commented on pull request #8555:
URL: https://github.com/apache/kafka/pull/8555#issuecomment-619464996


   @abbccdda @mjsax  PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8533: KAFKA-9589: Fixed bug in V2 log validator tests

2020-04-25 Thread GitBox


ijuma commented on pull request #8533:
URL: https://github.com/apache/kafka/pull/8533#issuecomment-619439574


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie opened a new pull request #8555: KAFKA-9920:Fix NetworkDegradeTest.test_rate test error

2020-04-25 Thread GitBox


jiameixie opened a new pull request #8555:
URL: https://github.com/apache/kafka/pull/8555


   The test case of 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_rate.
   
rate_limit_kbit=100.device_name=eth0.task_name=rate-1000-latency-50.latency_ms=50
   failed. And the error log was "Expected most of the measured rates to be 
within an order
   of magnitude of target 100. This means `tc` did not limit the bandwidth 
as expected."
   It was because that the rate_limt didn't take immediately after starting.
   
   Change-Id: I2de1d3fc696e11b411986bb2e3ce43f074fbac4a
   Signed-off-by: Jiamei Xie 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8224: KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap.

2020-04-24 Thread GitBox


mimaison commented on pull request #8224:
URL: https://github.com/apache/kafka/pull/8224#issuecomment-618893486


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#issuecomment-618768435


   Also, kicked off 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3915/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


ableegoldman commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414227525



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {

Review comment:
   Fine with me (although it does slightly detract from the opt-out 
possibility). WDYT about adding a retry backoff though? I'm a bit concerned we 
might just end up stuck in a loop of useless rebalancing, and waiting the full 
`probing.rebalance.interval` doesn't feel right either

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##
@@ -41,8 +42,8 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 
 public final class AssignorConfiguration {
-public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = 
"internal.high.availability.enabled";
-private final boolean highAvailabilityEnabled;
+public static final String INTERNAL_TASK_ASSIGNOR_CLASS = 
"internal.task.assignor.class";

Review comment:
   Should we put this with the other Streams internal configs? And/or 
follow the pattern of prefix+suffixing with `__` ?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -95,6 +94,11 @@ public boolean assign() {
 
 assignStatelessActiveTasks();
 
+log.info("Decided on assignment: " +

Review comment:
   I'm all for useful logging  

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/PriorTaskAssignor.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class PriorTaskAssignor implements TaskAssignor {
+private final StickyTaskAssignor delegate;
+
+public PriorTaskAssignor() {
+delegate = new StickyTaskAssignor(true);

Review comment:
   Thanks for the improvement, this feels a lot nicer





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-23 Thread GitBox


ConcurrencyPractitioner commented on pull request #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-618768078


   Reported in JIRA here: https://issues.apache.org/jira/browse/INFRA-20182



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-23 Thread GitBox


lbradstreet commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r414250681



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   @abbccdda friendly ping on this





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414193901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/PriorTaskAssignor.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class PriorTaskAssignor implements TaskAssignor {
+private final StickyTaskAssignor delegate;
+
+public PriorTaskAssignor() {
+delegate = new StickyTaskAssignor(true);

Review comment:
   The StickyTaskAssignor is capable of satisfying the PriorTaskAssignor's 
contract, so we can just delegate to it. The important thing is that we now 
have two separately defined contracts:
   1. return all previous tasks and assign the rest (PriorTaskAssignor)
   2. strike a balance between stickiness and balance (StickyTaskAssignor)
   
   The fact that the implementation is shared is an ... implementation detail.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] leonardge commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR

2020-04-24 Thread GitBox


leonardge commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r414467090



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1068,7 +1068,12 @@ class KafkaController(val config: KafkaConfig,
 val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp 
=> controllerContext.isReplicaOnline(leaderBroker, tp) &&
   controllerContext.partitionsBeingReassigned.isEmpty &&
   !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
-  controllerContext.allTopics.contains(tp.topic))
+  controllerContext.allTopics.contains(tp.topic) &&
+  
PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(
+controllerContext.partitionReplicaAssignment(tp),
+controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr,
+controllerContext.liveBrokerIds.toSet).nonEmpty

Review comment:
   Done! And after using the mentioned implementation the code block gets 
cluttered so I extracted it into a helper method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc opened a new pull request #8544: [WIP]KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-04-24 Thread GitBox


d8tltanc opened a new pull request #8544:
URL: https://github.com/apache/kafka/pull/8544


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8467: MINOR: reduce allocations in log start and recovery checkpoints

2020-04-23 Thread GitBox


lbradstreet commented on a change in pull request #8467:
URL: https://github.com/apache/kafka/pull/8467#discussion_r414190233



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -1003,9 +1003,17 @@ class LogManager(logDirs: Seq[File],
   /**
* Map of log dir to logs by topic and partitions in that dir
*/
-  private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
-(this.currentLogs.toList ++ this.futureLogs.toList).toMap
-  .groupBy { case (_, log) => log.parentDir }
+  def logsByDir: Map[String, Map[TopicPartition, Log]] = {
+// This code is called often by checkpoint processes and is written in a 
way that reduces
+// allocations and CPU with many topic partitions.
+// When changing this code please measure the changes with 
org.apache.kafka.jmh.server.CheckpointBench
+val byDir = new mutable.AnyRefMap[String, 
mutable.AnyRefMap[TopicPartition, Log]]()
+def addToDir(tp: TopicPartition, log: Log): Unit = {
+  byDir.getOrElseUpdate(log.parentDir, new 
mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
+}
+currentLogs.foreach { case (tp, log) => addToDir(tp, log) }
+futureLogs.foreach { case (tp, log) => addToDir(tp, log) }

Review comment:
   AFAICT, it easy enough to do something like this:
   ```
   pool.forEach((k,v) => f(k,v))
   ```
   But then we appear to be back to allocating tuples.
   
   It doesn't appear easy to do something like:
   ```
   pool.forEach(kv => f(kv.getKey, kv.getValue))
   ```
   since it's not able to create a BiConsumer for you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores

2020-04-23 Thread GitBox


ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r414247204



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
##
@@ -148,6 +161,82 @@ public void 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() th
 );
 }
 
+@Test
+public void shouldRecycleStateFromStandbyTaskPromotedToActiveTask() throws 
Exception {

Review comment:
   Wasn't really sure where to put this test, but this class seemed close 
enough. Verified that this does indeed fail on trunk without this fix





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414251894



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreamsWrapper;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.LagInfo;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.StateRestoreListener;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({IntegrationTest.class})
-public class LagFetchIntegrationTest {

Review comment:
   Ok, after some reflection, I feel better about my alternative proposal, 
so I've restored this test and just set the assignor to `PriorTaskAssignor`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414195680



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -515,84 +520,114 @@ public void 
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th
 // the app is supposed to copy all 60 records into the output topic
 // the app commits after each 10 records per partition, and thus will 
have 2*5 uncommitted writes
 //
-// a GC pause gets inject after 20 committed and 30 uncommitted 
records got received
-// -> the GC pause only affects one thread and should trigger a 
rebalance
+// a stall gets injected after 20 committed and 30 uncommitted records 
got received
+// -> the stall only affects one thread and should trigger a rebalance
 // after rebalancing, we should read 40 committed records (even if 50 
record got written)
 //
 // afterwards, the "stalling" thread resumes, and another rebalance 
should get triggered
 // we write the remaining 20 records and verify to read 60 result 
records
 
 try (
-final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1, 
eosConfig);
-final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1, 
eosConfig)
+final KafkaStreams streams1 = getKafkaStreams("streams1", false, 
"appDir1", 1, eosConfig);
+final KafkaStreams streams2 = getKafkaStreams("streams2", false, 
"appDir2", 1, eosConfig)

Review comment:
   I added an argument to the KafkaStreams builder to set the dummy host 
name. Previously, it was always "dummy" even though we had two instances, which 
resulted in the metadata map only containing one entry, even though there were 
two nodes in the cluster. I'm not sure if this was a cause of flakiness (since 
it seems it would be non-deterministic), but it's definitely not _right_.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8248: KAFKA-9501: convert between active and standby without closing stores

2020-04-23 Thread GitBox


ableegoldman commented on pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#issuecomment-618726059


   Finally ready for review @vvcephei @cadonna @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-23 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414201107



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -161,6 +165,9 @@ public void before() throws Exception {
 props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
+builder = new StreamsBuilder();
+builder.stream("source");

Review comment:
   Good call, done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-23 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-618729924


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] steverod opened a new pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-23 Thread GitBox


steverod opened a new pull request #8543:
URL: https://github.com/apache/kafka/pull/8543


   …ng. (#8469)
   
   In KAFKA-9826, a log whose first dirty offset was past the start of the 
active segment and past the last cleaned point resulted in an endless cycle of 
picking the segment to clean and discarding it. Though this didn't interfere 
with cleaning other log segments, it kept the log cleaner thread continuously 
busy (potentially wasting CPU and impacting other running threads) and filled 
the logs with lots of extraneous messages.
   
   This was determined to be because the active segment was getting mistakenly 
picked for cleaning, and because the logSegments code handles (start == end) 
cases only for (start, end) on a segment boundary: the intent is to return a 
null list, but if they're not on a segment boundary, the routine returns that 
segment.
   
   This fix has two parts:
   
   It changes logSegments to handle start==end by returning an empty List 
always.
   
   It changes the definition of calculateCleanableBytes to not consider 
anything past the UncleanableOffset; previously, it would potentially shift the 
UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset 
was past the firstUncleanableOffset. This has no real effect now in the context 
of the fix for (1) but it makes the code read more like the model that the code 
is attempting to follow.
   
   These changes require modifications to a few test cases that handled this 
particular test case; they were introduced in the context of KAFKA-8764. Those 
situations are now handled elsewhere in code, but the tests themselves allowed 
a DirtyOffset in the active segment, and expected an active segment to be 
selected for cleaning.
   
   Reviewer: Jun Rao 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] steverod opened a new pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-23 Thread GitBox


steverod opened a new pull request #8542:
URL: https://github.com/apache/kafka/pull/8542


   …ng. (#8469)
   
   In KAFKA-9826, a log whose first dirty offset was past the start of the 
active segment and past the last cleaned point resulted in an endless cycle of 
picking the segment to clean and discarding it. Though this didn't interfere 
with cleaning other log segments, it kept the log cleaner thread continuously 
busy (potentially wasting CPU and impacting other running threads) and filled 
the logs with lots of extraneous messages.
   
   This was determined to be because the active segment was getting mistakenly 
picked for cleaning, and because the logSegments code handles (start == end) 
cases only for (start, end) on a segment boundary: the intent is to return a 
null list, but if they're not on a segment boundary, the routine returns that 
segment.
   
   This fix has two parts:
   
   It changes logSegments to handle start==end by returning an empty List 
always.
   
   It changes the definition of calculateCleanableBytes to not consider 
anything past the UncleanableOffset; previously, it would potentially shift the 
UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset 
was past the firstUncleanableOffset. This has no real effect now in the context 
of the fix for (1) but it makes the code read more like the model that the code 
is attempting to follow.
   
   These changes require modifications to a few test cases that handled this 
particular test case; they were introduced in the context of KAFKA-8764. Those 
situations are now handled elsewhere in code, but the tests themselves allowed 
a DirtyOffset in the active segment, and expected an active segment to be 
selected for cleaning.
   
   Reviewer: Jun Rao 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-23 Thread GitBox


ConcurrencyPractitioner commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-618766447


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner removed a comment on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-23 Thread GitBox


ConcurrencyPractitioner removed a comment on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-618687469







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-23 Thread GitBox


ConcurrencyPractitioner commented on pull request #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-618766295


   @junrao Did try on another PR. Looks like it didn't work. I will fire a JIRA.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-23 Thread GitBox


mjsax commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414205108



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder,
 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.warn("Overriding number of StreamThreads to zero for 
global-only topology");

Review comment:
   I don't think this should be WARN but an INFO. There is nothing "wrong" 
and thus nothing to warn about?

##
File path: checkstyle/suppressions.xml
##
@@ -181,7 +181,7 @@
   files="StreamsPartitionAssignor.java"/>
 
 
+  
files="(AssignorConfiguration|InternalTopologyBuilder|KafkaStreams|ProcessorStateManager|StreamsPartitionAssignor|StreamThread|TaskManager).java"/>

Review comment:
   Do we really need to add this exception? How much work would it be to 
reduce the complexity of `KafkaStreams`?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder,
 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.warn("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
 // create the stream thread, global update thread, and cleanup thread
-threads = new 
StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+threads = new StreamThread[numStreamThreads];
+
+final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+final boolean hasGlobalTopology = globalTaskTopology != null;
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Must subscribe to at least one source topic or global 
table");
+throw new IllegalArgumentException("Topology has no stream threads 
and no global threads");

Review comment:
   Do we really want to do this? I understand that and empty topology does 
not make sense, and it would be appropriate to log a WARN -- but do we 
need/want to reject it?
   
   Also, should we instead throw an `InvalidTopologyException`? Furthermore, 
should we add a similar check to `StreamsBuilder.builder()` to raise this error 
even earlier (we would still nee this check though).

##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() 
throws Exception {
 startStreamsAndCheckDirExists(topology, true);
 }
 
+@Test
+public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+assertThrows(
+IllegalArgumentException.class,
+() -> new KafkaStreams(new StreamsBuilder().build(), props, 
supplier, time)
+);
+}
+
+@Test
+public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+final StreamsBuilder builder = new StreamsBuilder();
+builder.globalTable("anyTopic");
+final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
+
+assertThat(streams.threads.length, equalTo(0));
+}
+
+@Test
+public void shouldNotTransitToErrorStateWithGlobalOnlyTopology() throws 
InterruptedException {
+final StreamsBuilder builder = new StreamsBuilder();
+builder.globalTable("anyTopic");
+final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
+streams.setStateListener((newState, oldState) -> {
+if (newState.equals(State.ERROR)) {
+throw new AssertionError("Should not have transitioned to 
ERROR state with no stream threads");

Review comment:
   Is the state listener executed on the correct thread (ie, main testing 
thread?) that thus would really make the test fail? From my current 
understanding it would be called by the `GlobalThread` and thus the global 
thread would die instead (this would lead to a timeout below I guess?)

##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread 
thread, final boolean termin
 
 @Test
 public void 

[GitHub] [kafka] ijuma commented on a change in pull request #8467: MINOR: reduce allocations in log start and recovery checkpoints

2020-04-23 Thread GitBox


ijuma commented on a change in pull request #8467:
URL: https://github.com/apache/kafka/pull/8467#discussion_r414296613



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -1003,9 +1003,17 @@ class LogManager(logDirs: Seq[File],
   /**
* Map of log dir to logs by topic and partitions in that dir
*/
-  private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
-(this.currentLogs.toList ++ this.futureLogs.toList).toMap
-  .groupBy { case (_, log) => log.parentDir }
+  def logsByDir: Map[String, Map[TopicPartition, Log]] = {
+// This code is called often by checkpoint processes and is written in a 
way that reduces
+// allocations and CPU with many topic partitions.
+// When changing this code please measure the changes with 
org.apache.kafka.jmh.server.CheckpointBench
+val byDir = new mutable.AnyRefMap[String, 
mutable.AnyRefMap[TopicPartition, Log]]()
+def addToDir(tp: TopicPartition, log: Log): Unit = {
+  byDir.getOrElseUpdate(log.parentDir, new 
mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
+}
+currentLogs.foreach { case (tp, log) => addToDir(tp, log) }
+futureLogs.foreach { case (tp, log) => addToDir(tp, log) }

Review comment:
   Why do you say that the following would allocate tuples?
   
   ```scala
   def foreachEntry(f: (K, V) => Unit): Unit = {
   pool.forEach((k, v) => f(k, v))
 }
   ```
   
   Did you measure it? Looking at the code, I don't see any tuples.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414188592



##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1146,4 +1146,13 @@ private static byte checkRange(final byte i) {
 }
 };
 }
+
+@SafeVarargs
+public static  Set union(final Supplier> constructor, final 
Set... set) {

Review comment:
   I've been wanting this for a while, so I just decided to add it.

##
File path: build.gradle
##
@@ -236,8 +236,10 @@ subprojects {
 def logStreams = new HashMap()
 beforeTest { TestDescriptor td ->
   def tid = testId(td)
+  // truncate the file name if it's too long
   def logFile = new File(
-  "${projectDir}/build/reports/testOutput/${tid}.test.stdout")
+  "${projectDir}/build/reports/testOutput/${tid.substring(0, 
Math.min(tid.size(),240))}.test.stdout"

Review comment:
   Necessary because the test name that JUnit generates for the 
parameterized StreamsPartitionAssignorTest is slightly too long. I have no way 
to shorten it because the thing that pushes it over is the fact that there are 
two package names in the parameterized method name, and there's no control over 
the format of the test name itself. So, I decided just to truncate the file 
name instead, which is almost certainly still unique for pretty much any test.

##
File path: clients/src/test/java/org/apache/kafka/test/TestUtils.java
##
@@ -361,9 +361,9 @@ public static void waitForCondition(final TestCondition 
testCondition, final lon
  * avoid transient failures due to slow or overloaded machines.
  */
 public static void waitForCondition(final TestCondition testCondition, 
final long maxWaitMs, Supplier conditionDetailsSupplier) throws 
InterruptedException {
-String conditionDetailsSupplied = conditionDetailsSupplier != null ? 
conditionDetailsSupplier.get() : null;
-String conditionDetails = conditionDetailsSupplied != null ? 
conditionDetailsSupplied : "";
 retryOnExceptionWithTimeout(maxWaitMs, () -> {
+String conditionDetailsSupplied = conditionDetailsSupplier != null 
? conditionDetailsSupplier.get() : null;
+String conditionDetails = conditionDetailsSupplied != null ? 
conditionDetailsSupplied : "";

Review comment:
   This is pointless unless we evaluate it inside the lambda.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {
+log.info("Failed to fetch end offsets for changelogs, will return 
previous assignment to clients and "
+ + "trigger another rebalance to retry.");
+setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+taskAssignor = new PriorTaskAssignor();

Review comment:
   Just to clarify everyone's roles, I added a new assignor whose only 
behavior is to return all previously owned tasks, and then assign any unowned 
tasks.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-

[GitHub] [kafka] thspinto commented on pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-04-23 Thread GitBox


thspinto commented on pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#issuecomment-618732263


   Could you post your configuration file @bseenu?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-04-23 Thread GitBox


ning2008wisc commented on pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#issuecomment-618732741


   Hi @bseenu , it will be very helpful to elaborate the operational sequence 
of your testing case? e.g. which cluster your consumer initially consume from? 
Did you switch your consumer from primary to backup cluster? What message your 
consumer consumed before and after the cluster switch? Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei opened a new pull request #8541:
URL: https://github.com/apache/kafka/pull/8541


   * add a config to set the TaskAssignor
   * set the default assignor to HighAvailabilityTaskAssignor
   * fix broken tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414214864



##
File path: checkstyle/suppressions.xml
##
@@ -181,7 +181,7 @@
   files="StreamsPartitionAssignor.java"/>
 
 
+  
files="(AssignorConfiguration|InternalTopologyBuilder|KafkaStreams|ProcessorStateManager|StreamsPartitionAssignor|StreamThread|TaskManager).java"/>

Review comment:
   NPathComplexity is a tough one to work around. We'd wind up having to 
move some blocks of logic to separate helper classes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8525: KAFKA-9885; Evict last members of a group when the maximum allowed is reached

2020-04-24 Thread GitBox


dajac commented on pull request #8525:
URL: https://github.com/apache/kafka/pull/8525#issuecomment-618992102


   cc @hachikuji @abbccdda 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet opened a new pull request #8545: MINOR: smoke test for jmh benchmarks

2020-04-24 Thread GitBox


lbradstreet opened a new pull request #8545:
URL: https://github.com/apache/kafka/pull/8545


   Adds a smoke test to run a single jmh benchmark on each CI build. Without 
this change it is possible for the jmh benchmarks to break if we change our 
build setup.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #8467: MINOR: reduce allocations in log start and recovery checkpoints

2020-04-24 Thread GitBox


lbradstreet commented on pull request #8467:
URL: https://github.com/apache/kafka/pull/8467#issuecomment-619035931


   I have updated the benchmark results in the OP with the new `forEach` code. 
It is strictly better than the old `foreach` code.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8224: KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap.

2020-04-24 Thread GitBox


mimaison commented on pull request #8224:
URL: https://github.com/apache/kafka/pull/8224#issuecomment-619041569


   Test failure is unrelated, merging



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8364: MINOR: Partition is under reassignment when adding and removing

2020-04-24 Thread GitBox


ijuma commented on a change in pull request #8364:
URL: https://github.com/apache/kafka/pull/8364#discussion_r414551549



##
File path: core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
##
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.admin.TopicCommand.PartitionDescription
+import org.apache.kafka.clients.admin.PartitionReassignment
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.TopicPartitionInfo
+import org.junit.Assert._
+import org.junit.Test
+import scala.collection.JavaConverters._

Review comment:
   This needs to be changed to `CollectionConverters` and the branch needs 
to be rebased from master.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#issuecomment-619051055


   29 system tests failed, so I'll pin them to the StickyTaskAssignor and fix 
them in follow-up PRs.
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-04-24--001.1587711201--vvcephei--kafka-6145-task-assignor-flag--97228a5/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8467: MINOR: reduce allocations in log start and recovery checkpoints

2020-04-24 Thread GitBox


lbradstreet commented on a change in pull request #8467:
URL: https://github.com/apache/kafka/pull/8467#discussion_r414592818



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -1003,9 +1003,17 @@ class LogManager(logDirs: Seq[File],
   /**
* Map of log dir to logs by topic and partitions in that dir
*/
-  private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
-(this.currentLogs.toList ++ this.futureLogs.toList).toMap
-  .groupBy { case (_, log) => log.parentDir }
+  def logsByDir: Map[String, Map[TopicPartition, Log]] = {
+// This code is called often by checkpoint processes and is written in a 
way that reduces
+// allocations and CPU with many topic partitions.
+// When changing this code please measure the changes with 
org.apache.kafka.jmh.server.CheckpointBench
+val byDir = new mutable.AnyRefMap[String, 
mutable.AnyRefMap[TopicPartition, Log]]()
+def addToDir(tp: TopicPartition, log: Log): Unit = {
+  byDir.getOrElseUpdate(log.parentDir, new 
mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
+}
+currentLogs.foreach { case (tp, log) => addToDir(tp, log) }
+futureLogs.foreach { case (tp, log) => addToDir(tp, log) }

Review comment:
   @ijuma I had mistakenly thought it was taking a tuple there, and maybe 
that was just the way Scala converted a lambda to a BiConsumer, and the results 
had regressed back to close to what I was seeing with the iterator version.
   
   When I ran it, it returned saw:
   ```
   1140074.937 ± 75151.914B/op
   def foreachEntry(f: (K, V) => Unit): Unit = {
   pool.forEach((k, v) => f(k, v))
 }
   
   1044120.722 ±  1676.137B/op (included in an above comment)   
   pool.forEach(new BiConsumer[K,V] {
 override def accept(t: K, u: V): Unit = f(t, u)
   })
   ```
   
   Strangely I have just re-run the BiConsumer version and it returned 
`1188097.413 ±  1639.537B/op`. I'm not sure why it's regressed from what I 
saw on a previous run. I am OK with using the version with a lambda if you are. 
I'm not sure I will have time to investigate it further, and it's still a good 
improvement compared to the version that used `foreach`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8467: MINOR: reduce allocations in log start and recovery checkpoints

2020-04-24 Thread GitBox


ijuma commented on pull request #8467:
URL: https://github.com/apache/kafka/pull/8467#issuecomment-619037866


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8525: KAFKA-9885; Evict last members of a group when the maximum allowed is reached

2020-04-24 Thread GitBox


dajac commented on pull request #8525:
URL: https://github.com/apache/kafka/pull/8525#issuecomment-619001260


   Related to #8437.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-25 Thread GitBox


abbccdda commented on pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-619474699


   @avalsa Thanks for the updates, this LGTM. Will let @guozhangwang take 
another look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

2020-04-25 Thread GitBox


abbccdda commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r415209254



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##
@@ -312,6 +312,7 @@ public void buildAndOptimizeTopology(final Properties 
props) {
 graphNodePriorityQueue.offer(graphNode);
 }
 }
+internalTopologyBuilder.validateCoPartition();

Review comment:
   nit: validateCopartition

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -633,6 +634,45 @@ public final void copartitionSources(final 
Collection sourceNodes) {
 copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
 }
 
+public void validateCoPartition() {
+final List> copartitionGroups =
+copartitionSourceGroups
+.stream()
+.map(sourceGroup -> sourceGroup
+.stream()
+.flatMap(node -> 
nodeToSourceTopics.get(node).stream())
+.collect(Collectors.toSet())
+).collect(Collectors.toList());
+for (final Set coPartition : copartitionGroups) {
+final Map coPartitionProperties = 
new HashMap<>();

Review comment:
   Let's try to be consistent to use `copartition` instead of `coPartition`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -633,6 +634,45 @@ public final void copartitionSources(final 
Collection sourceNodes) {
 copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
 }
 
+public void validateCoPartition() {
+final List> copartitionGroups =
+copartitionSourceGroups
+.stream()
+.map(sourceGroup -> sourceGroup
+.stream()
+.flatMap(node -> 
nodeToSourceTopics.get(node).stream())
+.collect(Collectors.toSet())
+).collect(Collectors.toList());
+for (final Set coPartition : copartitionGroups) {
+final Map coPartitionProperties = 
new HashMap<>();

Review comment:
   Since we are only going to verify number of partitions, I think we could 
just set value as integer

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##
@@ -312,6 +312,7 @@ public void buildAndOptimizeTopology(final Properties 
props) {
 graphNodePriorityQueue.offer(graphNode);
 }
 }
+internalTopologyBuilder.validateCoPartition();

Review comment:
   One question, since we do verification in topology builder, is there any 
validation code in later stage that could be removed?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -633,6 +634,45 @@ public final void copartitionSources(final 
Collection sourceNodes) {
 copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
 }
 
+public void validateCoPartition() {
+final List> copartitionGroups =
+copartitionSourceGroups
+.stream()
+.map(sourceGroup -> sourceGroup
+.stream()
+.flatMap(node -> 
nodeToSourceTopics.get(node).stream())
+.collect(Collectors.toSet())
+).collect(Collectors.toList());
+for (final Set coPartition : copartitionGroups) {
+final Map coPartitionProperties = 
new HashMap<>();
+internalTopicNamesWithProperties.forEach((topic, prop) -> {
+if (coPartition.contains(topic) && 
prop.getNumberOfPartitions().isPresent()) {
+coPartitionProperties.put(topic, prop);
+}
+});
+if (coPartition.size() == coPartitionProperties.size()) {

Review comment:
   Could you clarify why we need this equality check?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8364: MINOR: Partition is under reassignment when adding and removing

2020-04-25 Thread GitBox


ijuma commented on pull request #8364:
URL: https://github.com/apache/kafka/pull/8364#issuecomment-619470164


   Unrelated failures for JDK 8:
   
   > kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback
   > kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback
   > kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
   > kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #7709: MINOR: Generated classes should have standard equals

2020-04-25 Thread GitBox


cmccabe commented on pull request #7709:
URL: https://github.com/apache/kafka/pull/7709#issuecomment-619481476


   Superseded by #8539 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #7886: KAFKA-9353: Add groupInstanceId to DescribeGroup for better visibility

2020-04-26 Thread GitBox


dengziming commented on pull request #7886:
URL: https://github.com/apache/kafka/pull/7886#issuecomment-619492636


   cc @hachikuji , PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #7862: KAFKA-9246:Update Heartbeat timeout when ConsumerCoordinator commit offset

2020-04-25 Thread GitBox


dengziming commented on pull request #7862:
URL: https://github.com/apache/kafka/pull/7862#issuecomment-619470669


   > cc @hachikuji .
   > 
   > @dengziming could you rebase the current PR?
   
   Thanks, done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #8556: MINOR: Add a duplicate() method to Message classes

2020-04-25 Thread GitBox


cmccabe opened a new pull request #8556:
URL: https://github.com/apache/kafka/pull/8556


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on pull request #8483: KAFKA-9865: Expose output topic names from TopologyTestDriver

2020-04-26 Thread GitBox


big-andy-coates commented on pull request #8483:
URL: https://github.com/apache/kafka/pull/8483#issuecomment-619509083


   Give a man a chance! ;)
   
   On Fri, 24 Apr 2020 at 18:12, Matthias J. Sax 
   wrote:
   
   > *@mjsax* commented on this pull request.
   > --
   >
   > In
   > 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
   > :
   >
   > > @@ -855,6 +856,20 @@ public void advanceWallClockTime(final Duration 
advance) {
   >  return new TestOutputTopic<>(this, topicName, keyDeserializer, 
valueDeserializer);
   >  }
   >
   > +/**
   > + * Get all the names of all the topics to which records have been 
output.
   > + * 
   > + * Call this method after piping the input into the test driver to 
retrieve the full set of topics the topology
   > + * produced records to.
   > + * 
   > + * The returned set of topic names includes changelog, repartition 
and sink topic names.
   > + *
   > + * @return the set of output topic names.
   > + */
   > +public final Set getOutputTopicNames() {
   >
   > @big-andy-coates  Can you update the
   > method name according to the KIP discussion?
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > ,
   > or unsubscribe
   > 

   > .
   >
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414208673



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {

Review comment:
   I made a bunch of changes to this test, because it was pretty brittle 
with respect to changes in the HighAvailabilityTaskAssignor. For context, this 
is the second time I've touched the assignment code since we introduced the 
HATA, and it's the second time I've had to deal with irrelevant test failures 
in this class.
   
   First, I replaced the ClientState mocks with "real" ClientStates, 
constructed to represent the desired scenario for each test. Mocks are really 
more appropriate for isolating a component from _external_ components (like 
mocking a remote service). Mocking data types leads to verifying that a 
specific set of queries happens against the data type, which is likely to break 
any time the logic under test changes in any way. Another problem with 
data-type mocks is that they can violate the invariants of the data type 
itself. For example, you can mock a list that both `isEmpty` and contains 
items. In our case, we threw NPEs in the assignor that could never happen in 
production when the mocked assigned/standby tasks didn't agree with the 
assigned tasks or the stateful assigned tasks weren't mocked to agree with the 
lags. Now, we just construct a ClientState for each client, representing the 
desired scenario and make assertions on the resulting assignment.
   
   Second, the tests as written rely heavily on shared mutable fields inserted 
into shared mutable collections to build the assignor. This can be a good way 
to minimize the text inside the test method, which lets readers focus on the 
proper logic of the test itself. However, it makes it harder to understand the 
full context of a test, and it also raises the possibility of tests polluting 
each others' environments. Since in this particular case, localizing all the 
setup code is about as compact as factoring it out, I went ahead and minimized 
the shared fields, and eliminated the mutability, the tests are self-contained.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




<    1   2   3   4   5   6   7   8   9   10   >