[GitHub] [kafka] bbejeck commented on pull request #8789: MINOR: Fix the javadoc broken links of streams

2020-06-04 Thread GitBox


bbejeck commented on pull request #8789:
URL: https://github.com/apache/kafka/pull/8789#issuecomment-638886703


   Ok to test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #8789: MINOR: Fix the javadoc broken links of streams

2020-06-04 Thread GitBox


bbejeck commented on pull request #8789:
URL: https://github.com/apache/kafka/pull/8789#issuecomment-638945282


   Java 11 failed with `java.lang.OutOfMemoryError: unable to create native 
thread: possibly out of memory or process/resource limits reached`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r435374125



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
##
@@ -300,20 +302,71 @@ public void 
shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapac
 @Test
 public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() {
 final Map taskOffsetSums = 
Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET);
-client.addPreviousTasksAndOffsetSums(taskOffsetSums);
+client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums);
 client.initializePrevTasks(Collections.emptyMap());
 assertThat(client.prevActiveTasks(), 
equalTo(Collections.singleton(TASK_0_1)));
 assertThat(client.previousAssignedTasks(), 
equalTo(Collections.singleton(TASK_0_1)));
 assertTrue(client.prevStandbyTasks().isEmpty());
 }
 
+@Test
+public void shouldReturnPreviousStatefulTasksForConsumer() {
+client.addPreviousTasksAndOffsetSums("c1", 
Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET));
+client.addPreviousTasksAndOffsetSums("c2", 
Collections.singletonMap(TASK_0_2, 0L));
+client.addPreviousTasksAndOffsetSums("c3", Collections.emptyMap());
+
+client.initializePrevTasks(Collections.emptyMap());
+client.computeTaskLags(
+UUID_1,
+mkMap(
+mkEntry(TASK_0_1, 1_000L),
+mkEntry(TASK_0_2, 1_000L)
+)
+);
+
+assertThat(client.previousTasksForConsumer("c1"), 
equalTo(mkSortedSet(TASK_0_1)));
+assertThat(client.previousTasksForConsumer("c2"), 
equalTo(mkSortedSet(TASK_0_2)));
+assertTrue(client.previousTasksForConsumer("c3").isEmpty());
+}
+
+@Test
+public void 
shouldReturnPreviousStatefulTasksForConsumerWhenLagIsNotComputed() {
+client.addPreviousTasksAndOffsetSums("c1", 
Collections.singletonMap(TASK_0_1, 1000L));
+client.initializePrevTasks(Collections.emptyMap());
+
+assertThat(client.previousTasksForConsumer("c1"), 
equalTo(mkSortedSet(TASK_0_1)));
+}
+
+@Test
+public void 
shouldReturnPreviousStatefulTasksForConsumerInIncreasingLagOrder() {

Review comment:
   You didn't miss it, I just snuck it in there after your review :P
   
   Sorry, should have called out that I made some more changes. I think that 
was the only significant logical change though. I'll try pulling the sort out 
into the assignment code





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-06-04 Thread GitBox


mimaison merged pull request #8311:
URL: https://github.com/apache/kafka/pull/8311


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10100) LiveLeaders field in LeaderAndIsrRequest is not used anymore

2020-06-04 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125984#comment-17125984
 ] 

Ismael Juma commented on KAFKA-10100:
-

I had noticed this too. I was wondering if it's worth doing this given that 
KIP-500 will change how we propagate metadata. If there's a concrete 
performance benefit, then we should do it, of course.

> LiveLeaders field in LeaderAndIsrRequest is not used anymore
> 
>
> Key: KAFKA-10100
> URL: https://issues.apache.org/jira/browse/KAFKA-10100
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> We have noticed that the `LiveLeaders` field in the LeaderAndIsrRequest is 
> not used anywhere but still populated by the controller.
> It seems that that field was introduced in AK `0.8.0` and was supposed to be 
> removed in AK `0.8.1`: 
> [https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/cluster/Partition.scala#L194.]
> I think that we can safely deprecate the field and stop populating it for all 
> versions > 0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10101) recovery point is advanced without flushing the data after recovery

2020-06-04 Thread Jun Rao (Jira)
Jun Rao created KAFKA-10101:
---

 Summary: recovery point is advanced without flushing the data 
after recovery
 Key: KAFKA-10101
 URL: https://issues.apache.org/jira/browse/KAFKA-10101
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.5.0
Reporter: Jun Rao


Currently, in Log.recoverLog(), we set recoveryPoint to logEndOffset after 
recovering the log segment. However, we don't flush the log segments after 
recovery. The potential issue is that if the broker has another hard failure, 
segments may be corrupted on disk but won't be going through recovery on 
another restart.

This logic was introduced in KAFKA-5829 since 1.0.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10090) Misleading warnings: The configuration was supplied but isn't a known config

2020-06-04 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126067#comment-17126067
 ] 

Chia-Ping Tsai commented on KAFKA-10090:


[~rwruck] Are you working at it? I can take over this if you have no free time 
:)

> Misleading warnings: The configuration was supplied but isn't a known config
> 
>
> Key: KAFKA-10090
> URL: https://issues.apache.org/jira/browse/KAFKA-10090
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Robert Wruck
>Priority: Major
>
> In our setup (using Spring cloud stream Kafka binder), we see log messages 
> like:
>  
> {{The configuration 'ssl.keystore.password' was supplied but isn't a known 
> config}}
>  
> logged by org.apache.kafka.clients.admin.AdminClientConfig. The Kafka binder 
> actually uses SSL and security.protocol is set to SSL.
> Looking through the code, a few things seem odd:
>  * The log message says "isn't a known config" but that's not true. It is 
> *known*, i.e. defined in ConfigDef, but not *used*.
>  * The method for detecting whether a config is actually *used* is not 
> complete. ChannelBuilders.channelBuilderConfigs() for example extracts the 
> configs to use for the created channel builder using *new 
> HashMap(config.values())* thus *get()* won't mark a config as used anymore.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10097) Avoid getting null map for task checkpoint

2020-06-04 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126001#comment-17126001
 ] 

Boyang Chen commented on KAFKA-10097:
-

[~feyman] Thank you for the interest! Unfortunately this ticket is still under 
discussion, will let you know the result. Meanwhile I will look for other side 
improvements for Streams to get you started :)

> Avoid getting null map for task checkpoint
> --
>
> Key: KAFKA-10097
> URL: https://issues.apache.org/jira/browse/KAFKA-10097
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> In StreamTask, we have the logic to generate a checkpoint offset map to be 
> materialized through StateManager#checkpoint. This map could be either empty 
> map or null, which the former case indicates to only pull down existing state 
> store checkpoint data, while the latter indicates no need to do a checkpoint 
> in the case such as we are suspending a task.
> Having two similar special logics for checkpointing could lead to unexpected 
> bugs, also we should think about separating the empty checkpoint case vs 
> passed-in checkpoint case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10101) recovery point is advanced without flushing the data after recovery

2020-06-04 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126036#comment-17126036
 ] 

Jun Rao commented on KAFKA-10101:
-

If this is an issue, one way to fix that is to simply not advance recoveryPoint 
after recovery.

> recovery point is advanced without flushing the data after recovery
> ---
>
> Key: KAFKA-10101
> URL: https://issues.apache.org/jira/browse/KAFKA-10101
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.0
>Reporter: Jun Rao
>Priority: Major
>
> Currently, in Log.recoverLog(), we set recoveryPoint to logEndOffset after 
> recovering the log segment. However, we don't flush the log segments after 
> recovery. The potential issue is that if the broker has another hard failure, 
> segments may be corrupted on disk but won't be going through recovery on 
> another restart.
> This logic was introduced in KAFKA-5829 since 1.0.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-04 Thread GitBox


rhauch commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-638990957


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-04 Thread GitBox


rhauch commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-638991218


   This should be backported to the `2.6` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch edited a comment on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-04 Thread GitBox


rhauch edited a comment on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-638991218


   This should be backported at least to the `2.6` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-06-04 Thread GitBox


mimaison commented on pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#issuecomment-638872285


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

2020-06-04 Thread GitBox


vvcephei commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435416204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
 }
 
 private Map committedOffsetForChangelogs(final 
Set partitions) {
-if (partitions.isEmpty())
-return Collections.emptyMap();
-

Review comment:
   What's the idea of dropping this?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
 }
 
 private Map committedOffsetForChangelogs(final 
Set partitions) {
-if (partitions.isEmpty())
-return Collections.emptyMap();
-
 final Map committedOffsets;
 try {
-// those do not have a committed offset would default to 0
-committedOffsets =  
mainConsumer.committed(partitions).entrySet().stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() 
== null ? 0L : e.getValue().offset()));
-} catch (final TimeoutException e) {
-// if it timed out we just retry next time.
-return Collections.emptyMap();
-} catch (final KafkaException e) {
-throw new StreamsException(String.format("Failed to retrieve end 
offsets for %s", partitions), e);
+committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);
+} catch (final StreamsException e) {
+if (e.getCause() instanceof TimeoutException) {

Review comment:
   This seems to be a step backwards, actually. Why wrap it as a 
StreamsException only just to immediately unwrap it again?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-04 Thread GitBox


vvcephei commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r43547



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -302,14 +300,19 @@ public void computeTaskLags(final UUID uuid, final 
Map allTaskEndO
  * @return end offset sum - offset sum
  *  Task.LATEST_OFFSET if this was previously an active running 
task on this client
  */
-long lagFor(final TaskId task) {
-final Long totalLag = taskLagTotals.get(task);
+public long lagFor(final TaskId task) {
+final Long totalLag;
+if (taskLagTotals.isEmpty()) {
+// If we couldn't compute the task lags due to failure to fetch 
offsets, just return a flat constant
+totalLag = 0L;

Review comment:
   Is this the right constant to represent "we don't know the lag"? Or did 
I mistake how this is going to be used?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10101) recovery point is advanced without flushing the data after recovery

2020-06-04 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126023#comment-17126023
 ] 

Jun Rao commented on KAFKA-10101:
-

[~ijuma] : Could you double check if this is an issue? Thanks.

> recovery point is advanced without flushing the data after recovery
> ---
>
> Key: KAFKA-10101
> URL: https://issues.apache.org/jira/browse/KAFKA-10101
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.0
>Reporter: Jun Rao
>Priority: Major
>
> Currently, in Log.recoverLog(), we set recoveryPoint to logEndOffset after 
> recovering the log segment. However, we don't flush the log segments after 
> recovery. The potential issue is that if the broker has another hard failure, 
> segments may be corrupted on disk but won't be going through recovery on 
> another restart.
> This logic was introduced in KAFKA-5829 since 1.0.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9576) Topic creation failure causing Stream thread death

2020-06-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9576.

Resolution: Duplicate

> Topic creation failure causing Stream thread death
> --
>
> Key: KAFKA-9576
> URL: https://issues.apache.org/jira/browse/KAFKA-9576
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Priority: Major
>
> The failure to create an internal topic could lead to the stream thread death 
> due to timeout:
> {code:java}
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 
> 11:03:00,083] ERROR 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> stream-thread [main] Unexpected error during topic description for 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog.
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 
> 11:03:00,083] ERROR 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> stream-thread 
> [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-02-14T03:03:00-08:00] 
> (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: Could not create topic 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog.
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:209)
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:223)
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:106)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>         at 
> 

[GitHub] [kafka] bbejeck commented on pull request #8789: MINOR: Fix the javadoc broken links of streams

2020-06-04 Thread GitBox


bbejeck commented on pull request #8789:
URL: https://github.com/apache/kafka/pull/8789#issuecomment-638979955


   Thanks for the contribution, @vitojeng.  This PR looks good to me, I'd like 
to verify the links locally, but after that, we'll get this merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10103) JDBC Sink Connector doesn't support numerical values in event keys

2020-06-04 Thread Jakub (Jira)
Jakub created KAFKA-10103:
-

 Summary: JDBC Sink Connector doesn't support numerical values in 
event keys
 Key: KAFKA-10103
 URL: https://issues.apache.org/jira/browse/KAFKA-10103
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.1
Reporter: Jakub


Our topics contain events with numerical keys and Avro values. We're trying to 
configure a JDBC connector to export data from these topics to Oracle DB, but 
it doesn't seem to work.

If we use strings as keys everything works fine, but if we switch to Longs it 
stops working. We tried different values of _key.converter_, including 
_org.apache.kafka.connect.converters.ByteArrayConverter,_ but either parsing of 
keys doesn't work, or they cannot be mapped to Oracle data type (NUMBER - this 
happens if we use _ByteArrayConverter)._ 

We also tried using transformations (CAST), but in that case we're getting 
_Cast transformation does not support casting to/from BYTES_

Please excuse if this is not a bug and there is a way to work with numerical 
keys, we just couldn't find anything about that in the documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] heritamas commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-06-04 Thread GitBox


heritamas commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r435424111



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -132,7 +132,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-.filter(x -> x.downstreamOffset() > 0)  // ignore offsets we 
cannot translate accurately
+.filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we 
cannot translate accurately

Review comment:
   Can downstream offset be negative?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-04 Thread GitBox


hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638962844


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8717: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config

2020-06-04 Thread GitBox


omkreddy commented on pull request #8717:
URL: https://github.com/apache/kafka/pull/8717#issuecomment-639036963


   @bdbyrne Can you rebase the PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9377) Refactor StreamsPartitionAssignor Repartition Count logic

2020-06-04 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126166#comment-17126166
 ] 

Boyang Chen commented on KAFKA-9377:


[~feyman] Hey I'm assigning this ticket to you. A little bit more context:



You could search for function 
`StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions` and 
look at what it does currently. It basically tries to initialize all the node 
repartition topic count by doing a random walk through of every node in an 
infinite loop. This is not efficient and intuitive, and we have been planning 
to refactor it and build a bottom-up DFS search, meaning that a parent node 
could only be initialized after all its children's topic partitions are 
initialized. Another reference is a unit test case 
`StreamsPartitionAssignor#shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology`
 which is validating a bug we fixed inside this logic a while ago, which 
hopefully gives you better insight.

Let me know if this makes sense to you, I know it's a bit unfriendly for a 
beginner task, but it definitely worths your effort to dig in and understand 
the KStream topology creation knowledge.

> Refactor StreamsPartitionAssignor Repartition Count logic
> -
>
> Key: KAFKA-9377
> URL: https://issues.apache.org/jira/browse/KAFKA-9377
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> The current repartition count uses a big while loop to randomly initialize 
> each repartition topic counts, which is error-prone and hard to maintain. A 
> more intuitive and robust solution would be doing a DFS search from 
> bottom-up, where we initialize all the sink nodes repartition topic counts by 
> making sure all their parents are initialized.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


vvcephei commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435523829



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {
+return Task.LATEST_OFFSET;
+} else {
+offsetSum += offset;

Review comment:
   It might be nice to have a sanity check here that `offset` is 
non-negative, since that would indicate we've unexpectedly received a sentinel 
value. I thought we did that already, but it's obviously not here.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {

Review comment:
   This seems pretty subtle, can you convert your GH explanation into a 
code comment?
   
   It also seems mentioning that we assume that if any changelog offset in the 
task is "latest", then we assume the whole task is active and therefore return 
"latest". Took me a minute to work that out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing topic

2020-06-04 Thread GitBox


omkreddy closed pull request #8715:
URL: https://github.com/apache/kafka/pull/8715


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on a change in pull request #8717: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config

2020-06-04 Thread GitBox


omkreddy commented on a change in pull request #8717:
URL: https://github.com/apache/kafka/pull/8717#discussion_r435468757



##
File path: core/src/main/scala/kafka/server/AdminManager.scala
##
@@ -454,6 +454,9 @@ class AdminManager(val config: KafkaConfig,
   private def alterTopicConfigs(resource: ConfigResource, validateOnly: 
Boolean,
 configProps: Properties, configEntriesMap: 
Map[String, String]): (ConfigResource, ApiError) = {
 val topic = resource.name
+if (!metadataCache.contains(topic))
+  throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not 
exist.")
+
 adminZkClient.validateTopicConfig(topic, configProps)

Review comment:
   yeah, there is lot scope for cleanup. we can improve future PRs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-04 Thread GitBox


vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-639068238


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-04 Thread GitBox


hachikuji commented on pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#issuecomment-639102074


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] szalapski commented on pull request #5876: KAFKA-7509: Avoid passing most non-applicable properties to producer, consumer, and admin client

2020-06-04 Thread GitBox


szalapski commented on pull request #5876:
URL: https://github.com/apache/kafka/pull/5876#issuecomment-639110680


   I still hope this can be fixed soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-04 Thread GitBox


vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-639128329







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl opened a new pull request #8800: [MINOR] fix incorrect GC log size with JDK9+

2020-06-04 Thread GitBox


xvrl opened a new pull request #8800:
URL: https://github.com/apache/kafka/pull/8800


   file size was incorrectly set to 100KB instead of 100MB
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation

2020-06-04 Thread GitBox


d8tltanc commented on a change in pull request #8421:
URL: https://github.com/apache/kafka/pull/8421#discussion_r435492178



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -683,21 +686,65 @@ public Node provide() {
 }
 }
 
+/**
+ * Provides context which the retry may refer to
+ */
+class CallRetryContext {
+
+private int tries = 0;
+private long nextAllowedTryMs = 0;
+private final double JITTER_MIN = 0.8;
+private final double JITTER_MAX = 1.2;
+
+public int tries() {
+return tries;
+}
+
+public long nextAllowedTryMs() {
+return nextAllowedTryMs;
+}
+
+private void updateTries(int currentTries) {
+this.tries = currentTries + 1;
+}
+
+private void updateNextAllowTryMs() {
+double jitter = Math.random() * (JITTER_MAX - JITTER_MIN) + 
JITTER_MIN;
+int failures = tries - 1;
+double exp = Math.pow(2, failures);
+this.nextAllowedTryMs = time.milliseconds() +
+(long) Math.min(retryBackoffMaxMs, jitter * exp * 
retryBackoffMs);
+// TODO: Remove the line below
+System.out.println("nextAllow = " +  (long) 
Math.min(retryBackoffMaxMs, jitter * exp * retryBackoffMs));
+}

Review comment:
   Make sense. Will extract it to a util class in my KIP-601 implementation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #8801: KAFKA-10100; LiveLeaders field in LeaderAndIsrRequest is not used anymore

2020-06-04 Thread GitBox


dajac opened a new pull request #8801:
URL: https://github.com/apache/kafka/pull/8801


   We have noticed that the `LiveLeaders` field in the LeaderAndIsrRequest is 
not used anywhere but still populated by the controller.
   
   It seems that that field was introduced in AK `0.8.0` and was supposed to be 
removed in AK `0.8.1`: 
https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/cluster/Partition.scala#L194.
 It has not been used since then.
   
   This PR proposes to simply stop populating the field for any version > 0. It 
avoids to compute the live leaders set and also reduces the size of the request 
on the wire.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-04 Thread GitBox


rhauch commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-639076340


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r43000



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {

Review comment:
   Well, this is by definition of the sentinel `LATEST_OFFSET` -- but I 
agree it's subtle that if any offset is "latest" then we know they all are, ie 
the task is active and RUNNING





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8107: MINOR: Remove Diamond and code code Alignment

2020-06-04 Thread GitBox


vvcephei commented on pull request #8107:
URL: https://github.com/apache/kafka/pull/8107#issuecomment-639124739


   Ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing

2020-06-04 Thread GitBox


omkreddy commented on pull request #8715:
URL: https://github.com/apache/kafka/pull/8715#issuecomment-639031210


   @gnkoshelev Thanks for the PR.  LGTM. I think, we can merge both PRs  (#8717 
). `AdminZkClient.validateTopicConfig()` is used in command line tools (This 
will go when remove ZK dependency). we may want throw same error in all places.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435552502



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {
+return Task.LATEST_OFFSET;
+} else {
+offsetSum += offset;

Review comment:
   We put the check for negative offsets in the RecordCollector, but I 
guess it doesn't hurt to check again here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


vvcephei commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435559860



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {
+return Task.LATEST_OFFSET;
+} else {
+offsetSum += offset;

Review comment:
   Aha! Thanks. Yeah, I'd be in favor of coding defensively here as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10069) The user-defined "predicate" and "negate" are not removed from Transformation

2020-06-04 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-10069.

Resolution: Fixed

> The user-defined "predicate" and "negate" are not removed from Transformation
> -
>
> Key: KAFKA-10069
> URL: https://issues.apache.org/jira/browse/KAFKA-10069
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.6.0
>
>
> There are official configDef for both "predicate" and "negate" so we should 
> remove user-defined configDef. However, current behavior does incorrect 
> comparison so the duplicate key will destroy the following embed configDef.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10069) The user-defined "predicate" and "negate" are not removed from Transformation

2020-06-04 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10069:
---
Fix Version/s: 2.6.0

> The user-defined "predicate" and "negate" are not removed from Transformation
> -
>
> Key: KAFKA-10069
> URL: https://issues.apache.org/jira/browse/KAFKA-10069
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.6.0
>
>
> There are official configDef for both "predicate" and "negate" so we should 
> remove user-defined configDef. However, current behavior does incorrect 
> comparison so the duplicate key will destroy the following embed configDef.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-04 Thread GitBox


hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-639033169


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9377) Refactor StreamsPartitionAssignor Repartition Count logic

2020-06-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9377:
--

Assignee: feyman  (was: Boyang Chen)

> Refactor StreamsPartitionAssignor Repartition Count logic
> -
>
> Key: KAFKA-9377
> URL: https://issues.apache.org/jira/browse/KAFKA-9377
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> The current repartition count uses a big while loop to randomly initialize 
> each repartition topic counts, which is error-prone and hard to maintain. A 
> more intuitive and robust solution would be doing a DFS search from 
> bottom-up, where we initialize all the sink nodes repartition topic counts by 
> making sure all their parents are initialized.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10104) Remove deprecated --zookeeper flags as specified in KIP-604

2020-06-04 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-10104:


 Summary: Remove deprecated --zookeeper flags as specified in 
KIP-604
 Key: KAFKA-10104
 URL: https://issues.apache.org/jira/browse/KAFKA-10104
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


Remove deprecated --zookeeper flags as specified in KIP-604



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #8135: KAFKA-9570: Define SSL configs in all worker config classes, not just distributed

2020-06-04 Thread GitBox


C0urante commented on pull request #8135:
URL: https://github.com/apache/kafka/pull/8135#issuecomment-639008252


   @rhauch would it be possible to backport this to 2.6 before the upcoming 
release?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8135: KAFKA-9570: Define SSL configs in all worker config classes, not just distributed

2020-06-04 Thread GitBox


rhauch commented on pull request #8135:
URL: https://github.com/apache/kafka/pull/8135#issuecomment-639015817







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10090) Misleading warnings: The configuration was supplied but isn't a known config

2020-06-04 Thread Robert Wruck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126132#comment-17126132
 ] 

Robert Wruck commented on KAFKA-10090:
--

[~chia7712] That would be great. I'm not really familiar with the code, so I'd 
have to decide whether to implement a fix in ChannelBuilders or RecordingMap.

> Misleading warnings: The configuration was supplied but isn't a known config
> 
>
> Key: KAFKA-10090
> URL: https://issues.apache.org/jira/browse/KAFKA-10090
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Robert Wruck
>Priority: Major
>
> In our setup (using Spring cloud stream Kafka binder), we see log messages 
> like:
>  
> {{The configuration 'ssl.keystore.password' was supplied but isn't a known 
> config}}
>  
> logged by org.apache.kafka.clients.admin.AdminClientConfig. The Kafka binder 
> actually uses SSL and security.protocol is set to SSL.
> Looking through the code, a few things seem odd:
>  * The log message says "isn't a known config" but that's not true. It is 
> *known*, i.e. defined in ConfigDef, but not *used*.
>  * The method for detecting whether a config is actually *used* is not 
> complete. ChannelBuilders.channelBuilderConfigs() for example extracts the 
> configs to use for the created channel builder using *new 
> HashMap(config.values())* thus *get()* won't mark a config as used anymore.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-04 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r435505756



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -585,34 +585,29 @@ public void 
testFetchProgressWithMissingPartitionPosition() {
 consumer.seekToBeginning(singleton(tp1));
 
 client.prepareResponse(
-new MockClient.RequestMatcher() {
-@Override
-public boolean matches(AbstractRequest body) {
-ListOffsetRequest request = (ListOffsetRequest) body;
-Map 
timestamps = request.partitionTimestamps();
-return timestamps.get(tp0).timestamp == 
ListOffsetRequest.LATEST_TIMESTAMP &&
-timestamps.get(tp1).timestamp == 
ListOffsetRequest.EARLIEST_TIMESTAMP;
-}
-}, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
+body -> {
+ListOffsetRequest request = (ListOffsetRequest) body;
+Map 
timestamps = request.partitionTimestamps();
+return timestamps.get(tp0).timestamp == 
ListOffsetRequest.LATEST_TIMESTAMP &&
+timestamps.get(tp1).timestamp == 
ListOffsetRequest.EARLIEST_TIMESTAMP;
+}, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
 Collections.singletonMap(tp1, 
Errors.NOT_LEADER_FOR_PARTITION)));
 client.prepareResponse(
-new MockClient.RequestMatcher() {
-@Override
-public boolean matches(AbstractRequest body) {
-FetchRequest request = (FetchRequest) body;
-return 
request.fetchData().keySet().equals(singleton(tp0)) &&
-request.fetchData().get(tp0).fetchOffset == 
50L;
+body -> {
+FetchRequest request = (FetchRequest) body;
+return request.fetchData().keySet().equals(singleton(tp0)) &&
+request.fetchData().get(tp0).fetchOffset == 50L;
 
-}
-}, fetchResponse(tp0, 50L, 5));
+}, fetchResponse(tp0, 50L, 5));
 
 ConsumerRecords records = 
consumer.poll(Duration.ofMillis(1));
 assertEquals(5, records.count());
 assertEquals(singleton(tp0), records.partitions());
 }
 
 private void initMetadata(MockClient mockClient, Map 
partitionCounts) {
-MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(1, 
partitionCounts);
+int leaderEpoch = 1;

Review comment:
   This is the actual fix, the other parts are mostly cleanup





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435589707



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {
+return Task.LATEST_OFFSET;
+} else {
+offsetSum += offset;

Review comment:
   Ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers

2020-06-04 Thread GitBox


mjsax commented on pull request #8771:
URL: https://github.com/apache/kafka/pull/8771#issuecomment-639157574


   Thanks for the PR @astubbs!
   
   Merged to `trunk` and cherry-picked to `2.6`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #8802: MINOR: Fix fetch session epoch comment in `FetchRequest.json`

2020-06-04 Thread GitBox


hachikuji opened a new pull request #8802:
URL: https://github.com/apache/kafka/pull/8802


   The current "about" string incorrectly describes the session epoch as the 
partition epoch. Rename to `SessionEpoch` to make usage clearer. Also rename 
`MaxWait` to `MaxWaitTimeMs` and `FetchableTopic` to `FetchTopic` for 
consistency with `FetchPartition`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-04 Thread GitBox


junrao commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r435558683



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##
@@ -113,14 +127,44 @@ public static ApiVersionsResponse fromStruct(Struct 
struct, short version) {
 }
 }
 
-public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, 
byte maxMagic) {
+public static ApiVersionsResponse apiVersionsResponse(
+int throttleTimeMs,
+byte maxMagic,
+Features latestSupportedFeatures) {
+return apiVersionsResponse(
+throttleTimeMs, maxMagic, latestSupportedFeatures, 
Optional.empty(), Optional.empty());
+}
+
+public static ApiVersionsResponse apiVersionsResponse(
+int throttleTimeMs,
+byte maxMagic,
+Features latestSupportedFeatures,
+Features finalizedFeatures,
+int finalizedFeaturesEpoch) {
+return apiVersionsResponse(
+throttleTimeMs, maxMagic, latestSupportedFeatures, 
Optional.of(finalizedFeatures), Optional.of(finalizedFeaturesEpoch));
+}
+
+private static ApiVersionsResponse apiVersionsResponse(
+int throttleTimeMs,
+byte maxMagic,
+Features latestSupportedFeatures,
+Optional> finalizedFeatures,
+Optional finalizedFeaturesEpoch) {

Review comment:
   Could we pass in `Optional` instead of two 
separate Optional?

##
File path: core/src/test/scala/unit/kafka/server/SupportedFeaturesTest.scala
##
@@ -0,0 +1,39 @@
+package kafka.server

Review comment:
   missing license header

##
File path: core/src/main/scala/kafka/cluster/Broker.scala
##
@@ -34,14 +36,21 @@ object Broker {
  brokerId: Int,
  endpoints: util.List[Endpoint],
  interBrokerEndpoint: Endpoint) 
extends AuthorizerServerInfo
+
+  def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = 
{
+new Broker(id, endPoints, rack, emptySupportedFeatures)
+  }
 }
 
 /**
  * A Kafka broker.
- * A broker has an id, a collection of end-points, an optional rack and a 
listener to security protocol map.
- * Each end-point is (host, port, listenerName).
+ *
+ * @param id  a broker id
+ * @param endPoints   a collection of EndPoint. Each end-point is (host, port, 
listener name, security protocol).
+ * @param rackan optional rack
+ * @param featuresoptional supported features

Review comment:
   The comment can be a bit misleading since features is not Optional.

##
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+import scala.util.control.Exception.ignoring
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce an optional latch that can be used to notify 
the caller when an
+   *updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+def 

[GitHub] [kafka] ableegoldman commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

2020-06-04 Thread GitBox


ableegoldman commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-639245176


   Ready for review @abbccdda @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

2020-06-04 Thread GitBox


ableegoldman opened a new pull request #8803:
URL: https://github.com/apache/kafka/pull/8803


   Rather than recreate the entire topology when we find new regex matched 
input partitions, we can just update the relevant pieces.
   
   Only the SourceNodes and ProcessorTopology seem to care about the input 
topics, and we can actually extract this information out of the SourceNode 
altogether by pulling it into the ProcessorTopology.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435588847



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
 
 @Test
 public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+try {
+final Serde stringSerde = Serdes.String();
 
-final Serde stringSerde = Serdes.String();
-
-final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
-// we compare lists of subscribed topics and hence requiring the order 
as well; this is guaranteed
-// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add 
TEST-TOPIC-2 so the list is always
-// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
-final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
+// we compare lists of subscribed topics and hence requiring the 
order as well; this is guaranteed
+// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only 
add TEST-TOPIC-2 so the list is always
+// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
+final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-CLUSTER.createTopic("TEST-TOPIC-1");
+CLUSTER.createTopic("TEST-TOPIC-1");
 
-final StreamsBuilder builder = new StreamsBuilder();
+final StreamsBuilder builder = new StreamsBuilder();
 
-final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
-final List assignedTopics = new CopyOnWriteArrayList<>();
-streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
-@Override
-public Consumer getConsumer(final Map config) {
-return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
-@Override
-public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
-super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
-}
-};
+pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
+final List assignedTopics = new CopyOnWriteArrayList<>();
+streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+@Override
+public Consumer getConsumer(final Map config) {
+return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+@Override
+public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+}
+};
 
-}
-});
+}
+});
 
-streams.start();
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+streams.start();
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
-CLUSTER.createTopic("TEST-TOPIC-2");
+CLUSTER.createTopic("TEST-TOPIC-2");
 
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
-streams.close();
-CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+streams.close();

Review comment:
   Well, won't we end up deleting the topics before closing it if we never 
reach the first `streams.close` ? Or does it not really matter in that case 
since something has already gone wrong (just curious, I'm fine with it as-is 
btw)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10097) Avoid getting null map for task checkpoint

2020-06-04 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126244#comment-17126244
 ] 

feyman commented on KAFKA-10097:


Cool, thanks a lot! [~bchen225242]

> Avoid getting null map for task checkpoint
> --
>
> Key: KAFKA-10097
> URL: https://issues.apache.org/jira/browse/KAFKA-10097
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> In StreamTask, we have the logic to generate a checkpoint offset map to be 
> materialized through StateManager#checkpoint. This map could be either empty 
> map or null, which the former case indicates to only pull down existing state 
> store checkpoint data, while the latter indicates no need to do a checkpoint 
> in the case such as we are suspending a task.
> Having two similar special logics for checkpointing could lead to unexpected 
> bugs, also we should think about separating the empty checkpoint case vs 
> passed-in checkpoint case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers

2020-06-04 Thread GitBox


mjsax merged pull request #8771:
URL: https://github.com/apache/kafka/pull/8771


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers

2020-06-04 Thread GitBox


mjsax merged pull request #8759:
URL: https://github.com/apache/kafka/pull/8759


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9991) Flaky Test KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled

2020-06-04 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126303#comment-17126303
 ] 

Sophie Blee-Goldman commented on KAFKA-9991:


Different test method: 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2734/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration/]
h3. Stacktrace

java.lang.AssertionError: Condition not met within timeout 3. Table did not 
read all values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) 
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205)
 at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration(KTableSourceTopicRestartIntegrationTest.java:186)

> Flaky Test 
> KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
> -
>
> Key: KAFKA-9991
> URL: https://issues.apache.org/jira/browse/KAFKA-9991
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6280/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/]
>  
> h3. Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Table did 
> not read all values at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
> org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205)
>  at 
> org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159)
>  at 
> org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10040) Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-04 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10040.
-
Fix Version/s: 2.7.0
   Resolution: Fixed

> Make computing the PreferredReplicaImbalanceCount metric more efficient
> ---
>
> Key: KAFKA-10040
> URL: https://issues.apache.org/jira/browse/KAFKA-10040
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>
> At the moment, computing the PreferredReplicaImbalanceCount metric traverses 
> all the partitions in the cluster to find out the imbalance ones. This is 
> extremely costly in cluster with large number of partitions and this is done 
> after the processing of each event in the controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-04 Thread GitBox


hachikuji merged pull request #8724:
URL: https://github.com/apache/kafka/pull/8724


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-04 Thread GitBox


hachikuji commented on pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#issuecomment-639211997


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest

2020-06-04 Thread GitBox


mjsax commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435583650



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
 
 @Test
 public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+try {
+final Serde stringSerde = Serdes.String();
 
-final Serde stringSerde = Serdes.String();
-
-final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
-// we compare lists of subscribed topics and hence requiring the order 
as well; this is guaranteed
-// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add 
TEST-TOPIC-2 so the list is always
-// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
-final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
+// we compare lists of subscribed topics and hence requiring the 
order as well; this is guaranteed
+// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only 
add TEST-TOPIC-2 so the list is always
+// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
+final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-CLUSTER.createTopic("TEST-TOPIC-1");
+CLUSTER.createTopic("TEST-TOPIC-1");
 
-final StreamsBuilder builder = new StreamsBuilder();
+final StreamsBuilder builder = new StreamsBuilder();
 
-final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
-final List assignedTopics = new CopyOnWriteArrayList<>();
-streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
-@Override
-public Consumer getConsumer(final Map config) {
-return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
-@Override
-public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
-super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
-}
-};
+pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
+final List assignedTopics = new CopyOnWriteArrayList<>();
+streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+@Override
+public Consumer getConsumer(final Map config) {
+return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+@Override
+public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+}
+};
 
-}
-});
+}
+});
 
-streams.start();
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+streams.start();
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
-CLUSTER.createTopic("TEST-TOPIC-2");
+CLUSTER.createTopic("TEST-TOPIC-2");
 
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
-streams.close();
-CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+streams.close();

Review comment:
   I don't think that is necessary -- there is an `@After` method that 
closed the client for us.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r435585609



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -302,14 +300,19 @@ public void computeTaskLags(final UUID uuid, final 
Map allTaskEndO
  * @return end offset sum - offset sum
  *  Task.LATEST_OFFSET if this was previously an active running 
task on this client
  */
-long lagFor(final TaskId task) {
-final Long totalLag = taskLagTotals.get(task);
+public long lagFor(final TaskId task) {
+final Long totalLag;
+if (taskLagTotals.isEmpty()) {
+// If we couldn't compute the task lags due to failure to fetch 
offsets, just return a flat constant
+totalLag = 0L;

Review comment:
   The value itself doesn't matter, just that it's constant across all 
tasks.
   
   But I'm guessing you meant, why not use the existing `UNKNOWN_OFFSET_SUM` 
sentinel, in which case the answer is probably just that I forgot about it. 
Anyway I did a slight additional refactoring beyond this, just fyi: instead of 
skipping the lag computation when we fail to fetch offsets, we now always 
initialize the lags and just pass in the `UNKNOWN_OFFSET_SUM` for all stateful 
tasks when the offset fetch fails.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435590436



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
 }
 
 private Map committedOffsetForChangelogs(final 
Set partitions) {
-if (partitions.isEmpty())
-return Collections.emptyMap();
-

Review comment:
   The diff is a bit misleading, this was also factored out into the new 
`ClientUtils#fetchCommittedOffsets` 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


mjsax commented on pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#issuecomment-639154057


   > I'm mildly concerned that we're using the state to implement idempotence 
of processing steps, but the states aren't 1:1 with the steps. For example, 
when we go from RUNNING to CLOSED, we transition through both prepareClose() 
and close(). But there's no CLOSE_PREPARED state, so there's no way to really 
differentiate that specifically prepareClose() has completed before, but not 
close().
   
   Correct. Note that there will be more follow up PR, that will remove, eg, 
`prepareClose()` so this will be addressed soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest

2020-06-04 Thread GitBox


mjsax commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435597397



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
 
 @Test
 public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+try {
+final Serde stringSerde = Serdes.String();
 
-final Serde stringSerde = Serdes.String();
-
-final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
-// we compare lists of subscribed topics and hence requiring the order 
as well; this is guaranteed
-// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add 
TEST-TOPIC-2 so the list is always
-// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
-final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
+// we compare lists of subscribed topics and hence requiring the 
order as well; this is guaranteed
+// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only 
add TEST-TOPIC-2 so the list is always
+// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
+final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-CLUSTER.createTopic("TEST-TOPIC-1");
+CLUSTER.createTopic("TEST-TOPIC-1");
 
-final StreamsBuilder builder = new StreamsBuilder();
+final StreamsBuilder builder = new StreamsBuilder();
 
-final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
-final List assignedTopics = new CopyOnWriteArrayList<>();
-streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
-@Override
-public Consumer getConsumer(final Map config) {
-return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
-@Override
-public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
-super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
-}
-};
+pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
+final List assignedTopics = new CopyOnWriteArrayList<>();
+streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+@Override
+public Consumer getConsumer(final Map config) {
+return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+@Override
+public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+}
+};
 
-}
-});
+}
+});
 
-streams.start();
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+streams.start();
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
-CLUSTER.createTopic("TEST-TOPIC-2");
+CLUSTER.createTopic("TEST-TOPIC-2");
 
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
-streams.close();
-CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+streams.close();

Review comment:
   Yes, that is my reasoning.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest

2020-06-04 Thread GitBox


mjsax commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435597397



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
 
 @Test
 public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+try {
+final Serde stringSerde = Serdes.String();
 
-final Serde stringSerde = Serdes.String();
-
-final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
-// we compare lists of subscribed topics and hence requiring the order 
as well; this is guaranteed
-// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add 
TEST-TOPIC-2 so the list is always
-// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
-final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
+// we compare lists of subscribed topics and hence requiring the 
order as well; this is guaranteed
+// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only 
add TEST-TOPIC-2 so the list is always
+// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
+final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-CLUSTER.createTopic("TEST-TOPIC-1");
+CLUSTER.createTopic("TEST-TOPIC-1");
 
-final StreamsBuilder builder = new StreamsBuilder();
+final StreamsBuilder builder = new StreamsBuilder();
 
-final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
-final List assignedTopics = new CopyOnWriteArrayList<>();
-streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
-@Override
-public Consumer getConsumer(final Map config) {
-return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
-@Override
-public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
-super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
-}
-};
+pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
+final List assignedTopics = new CopyOnWriteArrayList<>();
+streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+@Override
+public Consumer getConsumer(final Map config) {
+return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+@Override
+public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+}
+};
 
-}
-});
+}
+});
 
-streams.start();
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+streams.start();
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
-CLUSTER.createTopic("TEST-TOPIC-2");
+CLUSTER.createTopic("TEST-TOPIC-2");
 
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
-streams.close();
-CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+streams.close();

Review comment:
   Yet, that is my reasoning.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers

2020-06-04 Thread GitBox


mjsax commented on pull request #8759:
URL: https://github.com/apache/kafka/pull/8759#issuecomment-639170115


   Merged to `trunk` and cherry-picked to `2.6` and `2.5` branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-04 Thread GitBox


ableegoldman commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-639184354


   Java8 failed with 
   
`KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration`
   
   Java14 failed with 
`KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled`
   
   I've seen both of these be flaky already (and frankly am a bit concerned 
about them...) but I'll see if I can reproduce this locally in case this PR is 
somehow making them worse



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on pull request #8789: MINOR: Fix the javadoc broken links of streams

2020-06-04 Thread GitBox


vitojeng commented on pull request #8789:
URL: https://github.com/apache/kafka/pull/8789#issuecomment-639186748


   Thanks @bbejeck .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


guozhangwang commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435636306



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -247,48 +251,75 @@ public void completeRestoration() {
  */
 @Override
 public void prepareSuspend() {
-if (state() == State.CREATED || state() == State.SUSPENDED) {
-// do nothing
-log.trace("Skip prepare suspending since state is {}", state());
-} else if (state() == State.RUNNING) {
-closeTopology(true);
+switch (state()) {

Review comment:
   Yup, makes sense, just curious :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9991) Flaky Test KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled

2020-06-04 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126302#comment-17126302
 ] 

Sophie Blee-Goldman commented on KAFKA-9991:


[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/898/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/]
h3. Stacktrace

java.lang.AssertionError: Condition not met within timeout 3. Table did not 
read all values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) 
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205)
 at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159)
 at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143)

> Flaky Test 
> KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
> -
>
> Key: KAFKA-9991
> URL: https://issues.apache.org/jira/browse/KAFKA-9991
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6280/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/]
>  
> h3. Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Table did 
> not read all values at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
> org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205)
>  at 
> org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159)
>  at 
> org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435588028



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {

Review comment:
   Sure, can add a comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435593477



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata 
changelogMetadata) {
 }
 
 private Map committedOffsetForChangelogs(final 
Set partitions) {
-if (partitions.isEmpty())
-return Collections.emptyMap();
-
 final Map committedOffsets;
 try {
-// those do not have a committed offset would default to 0
-committedOffsets =  
mainConsumer.committed(partitions).entrySet().stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() 
== null ? 0L : e.getValue().offset()));
-} catch (final TimeoutException e) {
-// if it timed out we just retry next time.
-return Collections.emptyMap();
-} catch (final KafkaException e) {
-throw new StreamsException(String.format("Failed to retrieve end 
offsets for %s", partitions), e);
+committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);
+} catch (final StreamsException e) {
+if (e.getCause() instanceof TimeoutException) {

Review comment:
   I thought this might raise some eyebrows. I wanted to keep the 
ClientUtils methods consistent, and thought wrapping everything as a 
StreamsException would be cleaner. But maybe it makes more sense to throw the 
TimeoutException separately...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10102) Source node references not updated after rebuilding topology

2020-06-04 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10102:
---

Assignee: (was: Sophie Blee-Goldman)

> Source node references not updated after rebuilding topology
> 
>
> Key: KAFKA-10102
> URL: https://issues.apache.org/jira/browse/KAFKA-10102
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Luckily this bug was caught by 
> RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we 
> saw it fail with an NPE during SourceNode#deserializeKey, implying that the 
> key deserializer was null which in turns implies that the source node was 
> never initialized.
> This can happen when a task is updated with new regex matched topic 
> partitions. In order to update the topology with the new input partitions, we 
> actually just create an entirely new topology from scratch including building 
> new source node objects. We then re/initialize this new topology once the 
> task is resumed.
> The problem is that the task's RecordQueues save a reference to the 
> corresponding source node, and use this to pass polled records into the 
> topology. But the RecordQueues aren't updated with the newly built source 
> nodes and still point to the original nodes.
> If the task had not completed restoration before being updated with new 
> partitions, it would never have initialized the original topology or source 
> nodes, resulting in an NPE when the RecordQueue passes a record to the old, 
> uninitialized source node.
> This is the only specific known bug, but I haven't checked the entire code 
> base so it's possible there are other node references saved that might result 
> in bugs. We should try and avoid rebuilding an entirely new topology if at 
> all possible, and see if we can just update the input partitions only where 
> necessary



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9923) Join window store duplicates can be compacted in changelog

2020-06-04 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-9923:
--

Assignee: Bruno Cadonna

> Join window store duplicates can be compacted in changelog 
> ---
>
> Key: KAFKA-9923
> URL: https://issues.apache.org/jira/browse/KAFKA-9923
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> This wrapping occurs at the innermost layer of the store hierarchy, which 
> means the duplicates must first pass through the changelogging layer. At this 
> point the keys are still identical. So, we end up sending the records to the 
> changelog without distinct keys and therefore may lose the older of the 
> duplicates during compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10102) Source node references not updated after rebuilding topology

2020-06-04 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10102:
---

Assignee: Sophie Blee-Goldman

> Source node references not updated after rebuilding topology
> 
>
> Key: KAFKA-10102
> URL: https://issues.apache.org/jira/browse/KAFKA-10102
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Luckily this bug was caught by 
> RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we 
> saw it fail with an NPE during SourceNode#deserializeKey, implying that the 
> key deserializer was null which in turns implies that the source node was 
> never initialized.
> This can happen when a task is updated with new regex matched topic 
> partitions. In order to update the topology with the new input partitions, we 
> actually just create an entirely new topology from scratch including building 
> new source node objects. We then re/initialize this new topology once the 
> task is resumed.
> The problem is that the task's RecordQueues save a reference to the 
> corresponding source node, and use this to pass polled records into the 
> topology. But the RecordQueues aren't updated with the newly built source 
> nodes and still point to the original nodes.
> If the task had not completed restoration before being updated with new 
> partitions, it would never have initialized the original topology or source 
> nodes, resulting in an NPE when the RecordQueue passes a record to the old, 
> uninitialized source node.
> This is the only specific known bug, but I haven't checked the entire code 
> base so it's possible there are other node references saved that might result 
> in bugs. We should try and avoid rebuilding an entirely new topology if at 
> all possible, and see if we can just update the input partitions only where 
> necessary



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10102) Source node references not updated after rebuilding topology

2020-06-04 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10102:
---

Assignee: Sophie Blee-Goldman

> Source node references not updated after rebuilding topology
> 
>
> Key: KAFKA-10102
> URL: https://issues.apache.org/jira/browse/KAFKA-10102
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Luckily this bug was caught by 
> RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we 
> saw it fail with an NPE during SourceNode#deserializeKey, implying that the 
> key deserializer was null which in turns implies that the source node was 
> never initialized.
> This can happen when a task is updated with new regex matched topic 
> partitions. In order to update the topology with the new input partitions, we 
> actually just create an entirely new topology from scratch including building 
> new source node objects. We then re/initialize this new topology once the 
> task is resumed.
> The problem is that the task's RecordQueues save a reference to the 
> corresponding source node, and use this to pass polled records into the 
> topology. But the RecordQueues aren't updated with the newly built source 
> nodes and still point to the original nodes.
> If the task had not completed restoration before being updated with new 
> partitions, it would never have initialized the original topology or source 
> nodes, resulting in an NPE when the RecordQueue passes a record to the old, 
> uninitialized source node.
> This is the only specific known bug, but I haven't checked the entire code 
> base so it's possible there are other node references saved that might result 
> in bugs. We should try and avoid rebuilding an entirely new topology if at 
> all possible, and see if we can just update the input partitions only where 
> necessary



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

2020-06-04 Thread GitBox


hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-639189300


   I will go ahead and merge this. I build locally with JDK11 and Scala 2.13.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

2020-06-04 Thread GitBox


ableegoldman commented on pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#issuecomment-639205947


   Hey @vvcephei, I addressed your comments and added tests.  Let me know if 
there's any test coverage that still seems missing



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435647575



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map clientState
 .flatMap(Collection::stream)
 .collect(Collectors.toList());
 
-final Collection allPreexistingChangelogPartitions 
= new ArrayList<>(allChangelogPartitions);
-allPreexistingChangelogPartitions.removeIf(partition -> 
newlyCreatedChangelogs.contains(partition.topic()));
+final Set preexistingChangelogPartitions = new 
HashSet<>();
+final Set preexistingSourceChangelogPartitions = 
new HashSet<>();
+final Set newlyCreatedChangelogPartitions = new 
HashSet<>();
+for (final TopicPartition changelog : allChangelogPartitions) {
+if (newlyCreatedChangelogs.contains(changelog.topic())) {
+newlyCreatedChangelogPartitions.add(changelog);
+} else if 
(optimizedSourceChangelogs.contains(changelog.topic())) {
+preexistingSourceChangelogPartitions.add(changelog);
+} else {
+preexistingChangelogPartitions.add(changelog);
+}
+}
+
+// Make the listOffsets request first so it can  fetch the offsets 
for non-source changelogs
+// asynchronously while we use the blocking Consumer#committed 
call to fetch source-changelog offsets
+final KafkaFuture> 
endOffsetsFuture =
+fetchEndOffsetsFuture(preexistingChangelogPartitions, 
adminClient);
 
-final Collection 
allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-
allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+final Map sourceChangelogEndOffsets =
+fetchCommittedOffsets(preexistingSourceChangelogPartitions, 
taskManager.mainConsumer());
 
-final Map endOffsets =
-fetchEndOffsets(allPreexistingChangelogPartitions, 
adminClient);
+final Map endOffsets = 
ClientUtils.getEndOffsets(endOffsetsFuture);
 
-allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, 
changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
+allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+changelogsByStatefulTask,
+endOffsets,
+sourceChangelogEndOffsets,
+newlyCreatedChangelogPartitions);
 fetchEndOffsetsSuccessful = true;
-} catch (final StreamsException e) {
+} catch (final StreamsException | TimeoutException e) {

Review comment:
   @vvcephei I've been wondering if maybe we should _only_  catch the 
TimeoutException, and interpret a StreamsException as fatal (like 
IllegalStateException for example). This is how we were using  
`Consumer#committed` in the StoreChangelogReader, and AFAICT that only throws 
KafkaException on "unrecoverable errors" (quoted from javadocs)
   But I can't tell whether the Admin's `listOffsets` might throw on transient 
errors, so I'm leaning towards catching both just to be safe. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest

2020-06-04 Thread GitBox


ableegoldman commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435584438



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
 
 @Test
 public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+try {
+final Serde stringSerde = Serdes.String();
 
-final Serde stringSerde = Serdes.String();
-
-final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
-// we compare lists of subscribed topics and hence requiring the order 
as well; this is guaranteed
-// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add 
TEST-TOPIC-2 so the list is always
-// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
-final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
+// we compare lists of subscribed topics and hence requiring the 
order as well; this is guaranteed
+// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only 
add TEST-TOPIC-2 so the list is always
+// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
+final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-CLUSTER.createTopic("TEST-TOPIC-1");
+CLUSTER.createTopic("TEST-TOPIC-1");
 
-final StreamsBuilder builder = new StreamsBuilder();
+final StreamsBuilder builder = new StreamsBuilder();
 
-final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
-final List assignedTopics = new CopyOnWriteArrayList<>();
-streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
-@Override
-public Consumer getConsumer(final Map config) {
-return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
-@Override
-public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
-super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
-}
-};
+pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
+final List assignedTopics = new CopyOnWriteArrayList<>();
+streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+@Override
+public Consumer getConsumer(final Map config) {
+return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+@Override
+public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+}
+};
 
-}
-});
+}
+});
 
-streams.start();
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+streams.start();
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
-CLUSTER.createTopic("TEST-TOPIC-2");
+CLUSTER.createTopic("TEST-TOPIC-2");
 
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
-streams.close();
-CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+streams.close();

Review comment:
   Then why close it here as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest

2020-06-04 Thread GitBox


mjsax commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435587017



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
 
 @Test
 public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+try {
+final Serde stringSerde = Serdes.String();
 
-final Serde stringSerde = Serdes.String();
-
-final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
-// we compare lists of subscribed topics and hence requiring the order 
as well; this is guaranteed
-// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add 
TEST-TOPIC-2 so the list is always
-// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
-final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+final List expectedFirstAssignment = 
Collections.singletonList("TEST-TOPIC-1");
+// we compare lists of subscribed topics and hence requiring the 
order as well; this is guaranteed
+// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only 
add TEST-TOPIC-2 so the list is always
+// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 
behavior ever changed it may become a flaky test
+final List expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-CLUSTER.createTopic("TEST-TOPIC-1");
+CLUSTER.createTopic("TEST-TOPIC-1");
 
-final StreamsBuilder builder = new StreamsBuilder();
+final StreamsBuilder builder = new StreamsBuilder();
 
-final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
-final List assignedTopics = new CopyOnWriteArrayList<>();
-streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
-@Override
-public Consumer getConsumer(final Map config) {
-return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
-@Override
-public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
-super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
-}
-};
+pattern1Stream.to(outputTopic, Produced.with(stringSerde, 
stringSerde));
+final List assignedTopics = new CopyOnWriteArrayList<>();
+streams = new KafkaStreams(builder.build(), streamsConfiguration, 
new DefaultKafkaClientSupplier() {
+@Override
+public Consumer getConsumer(final Map config) {
+return new KafkaConsumer(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+@Override
+public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener listener) {
+super.subscribe(topics, new 
TheConsumerRebalanceListener(assignedTopics, listener));
+}
+};
 
-}
-});
+}
+});
 
-streams.start();
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+streams.start();
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
-CLUSTER.createTopic("TEST-TOPIC-2");
+CLUSTER.createTopic("TEST-TOPIC-2");
 
-TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
-streams.close();
-CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+streams.close();

Review comment:
   I think it's better to first close it before we delete the topics.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9203) kafka-client 2.3.1 fails to consume lz4 compressed topic

2020-06-04 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126293#comment-17126293
 ] 

Ismael Juma commented on KAFKA-9203:


Hi [~manthanit]. Thanks for providing this information. I was personally quite 
puzzled how the original change caused problems. But I reverted it out of 
caution since it was a clean-up and I wasn't sure if there was a bug in the 
underlying library. [~dwatzke] Is there any chance you were including multiple 
versions of lz4 in the classpath by accident?

> kafka-client 2.3.1 fails to consume lz4 compressed topic
> 
>
> Key: KAFKA-9203
> URL: https://issues.apache.org/jira/browse/KAFKA-9203
> Project: Kafka
>  Issue Type: Bug
>  Components: compression, consumer
>Affects Versions: 2.3.0, 2.3.1
>Reporter: David Watzke
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 2.4.0, 2.3.2
>
> Attachments: kafka-clients-2.3.2-SNAPSHOT.jar
>
>
> I run kafka cluster 2.1.1
> when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead 
> of 2.2.0, I immediately started getting the following exceptions in a loop 
> when consuming a topic with LZ4-compressed messages:
> {noformat}
> 2019-11-18 11:59:16.888 ERROR [afka-consumer-thread] 
> com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred 
> while polling and processing messages: org.apache.kafka.common.KafkaExce
> ption: Received exception when fetching the next record from 
> FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue 
> consumption. 
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the 
> record to continue consumption. 
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473)
>  
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>  
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>  
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>  
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
>  
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) 
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
>     at 
> com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180)
>  
>     at 
> com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) 
>     at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) 
>     at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19)
>  
>     at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>     at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>     at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>     at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>     at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>     at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>     at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>     at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>     at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>     at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>     at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>     at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>     at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>     at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>     at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) 
>     at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18)
>  
>     at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>     at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>     at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>     at 

[GitHub] [kafka] guozhangwang commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-04 Thread GitBox


guozhangwang commented on pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#issuecomment-639196728


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving

2020-06-04 Thread William Reynolds (Jira)
William Reynolds created KAFKA-10105:


 Summary: Regression in group coordinator dealing with flaky 
clients joining while leaving
 Key: KAFKA-10105
 URL: https://issues.apache.org/jira/browse/KAFKA-10105
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.4.1
 Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker
Kafka 2.4.1 on jre 11 on debian 9 in docker
Reporter: William Reynolds


Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals 
correctly with a consumer sending a join after a leave correctly.

What happens no is that if a consumer sends a leaving then follows up by trying 
to send a join again as it is shutting down the group coordinator adds the 
leaving member to the group but never seems to heartbeat that member.

Since the consumer is then gone when it joins again after starting it is added 
as a new member but the zombie member is there and is included in the partition 
assignment which means that those partitions never get consumed from. What can 
also happen is that one of the zombies gets group leader so rebalance gets 
stuck forever and the group is entirely blocked.

I have not been able to track down where this got introduced between 1.1.0 and 
2.4.1 but I will look further into this. Unfortunately the logs are essentially 
silent about the zombie mebers and I only had INFO level logging on during the 
issue and by stopping all the consumers in the group and restarting the broker 
coordinating that group we could get back to a working state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-06-04 Thread GitBox


abbccdda commented on pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#issuecomment-639265937


   retest please
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-06-04 Thread GitBox


ning2008wisc commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r435705249



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -316,6 +316,69 @@ public void testReplication() throws InterruptedException {
 backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
 }
 
+
+@Test
+public void testOneWayReplicationWithAutorOffsetSync1() throws 
InterruptedException {
+
+// create consumers before starting the connectors so we don't need to 
wait for discovery
+Consumer consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+"group.id", "consumer-group-1"), "test-topic-1");
+consumer1.poll(Duration.ofMillis(500));
+consumer1.commitSync();
+consumer1.close();
+
+// enable automated consumer group offset sync
+mm2Props.put("sync.group.offsets.enabled", "true");
+mm2Props.put("sync.group.offsets.interval.seconds", "1");
+// one way replication from primary to backup
+mm2Props.put("backup->primary.enabled", "false");
+
+mm2Config = new MirrorMakerConfig(mm2Props);
+
+waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
+
+// sleep 5 seconds to ensure the automated group offset sync is 
complete
+time.sleep(5000);
+
+// create a consumer at backup cluster with same consumer group Id to 
consume 1 topic
+Consumer consumer = 
backup.kafka().createConsumerAndSubscribeTo(
+Collections.singletonMap("group.id", "consumer-group-1"), 
"primary.test-topic-1");
+ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+// the size of consumer record should be zero, because the offsets of 
the same consumer group
+// have been automatically synchronized from primary to backup by the 
background job, so no
+// more records to consume from the replicated topic by the same 
consumer group at backup cluster
+assertTrue("consumer record size is not zero", records.count() == 0);
+
+// now create a new topic in primary cluster
+primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+backup.kafka().createTopic("primary.test-topic-2", 1);
+// produce some records to the new topic in primary cluster
+for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+primary.kafka().produce("test-topic-2", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
+}
+
+// create a consumer at primary cluster to consume the new topic
+consumer1 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+"group.id", "consumer-group-1"), "test-topic-2");
+consumer1.poll(Duration.ofMillis(500));
+consumer1.commitSync();
+consumer1.close();
+
+// sleep 5 seconds to ensure the automated group offset sync is 
complete
+time.sleep(5000);
+
+// create a consumer at backup cluster with same consumer group Id to 
consume old and new topic
+consumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+"group.id", "consumer-group-1"), "primary.test-topic-1", 
"primary.test-topic-2");
+
+records = consumer.poll(Duration.ofMillis(500));
+// similar reasoning as above, no more records to consume by the same 
consumer group at backup cluster
+assertTrue("consumer record size is not zero", records.count() == 0);

Review comment:
   updated

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -207,8 +185,30 @@ public void close() {
 backup.stop();
 }
 
+
+
 @Test
 public void testReplication() throws InterruptedException {
+
+// create consumers before starting the connectors so we don't need to 
wait for discovery
+Consumer consumer3 = 
primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(

Review comment:
   updated to `consumer1` and `consumer2`

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
##
@@ -64,4 +70,58 @@ public void testCheckpoint() {
 assertEquals(13, checkpoint2.downstreamOffset());
 assertEquals(234L, sourceRecord2.timestamp().longValue());
 }
+
+@Test
+public void testSyncOffset() {
+Map> 
idleConsumerGroupsOffset = new HashMap<>();
+Map> checkpointsPerConsumerGroup = new 
HashMap<>();
+
+String consumer1 = "consumer1";
+String consumer2 = "consumer2";
+
+String topic1 = "topic1";
+String topic2 = "topic2";
+
+// 'c1t1' denotes consumer offsets of all partitions of topic1 for 
consumer1
+   

[jira] [Commented] (KAFKA-10090) Misleading warnings: The configuration was supplied but isn't a known config

2020-06-04 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126412#comment-17126412
 ] 

Chia-Ping Tsai commented on KAFKA-10090:


It seems to me that the PR should include following changes.

1. 
[ChannelBuilders#channelBuilderConfigs|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java#L162]
 should not return copy of configs.

2. verify the unused keys (see 
https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java#L71)

[~rwruck] Feel free to file a PR to resolve this issue. The PR is able to bring 
more discussion :)



> Misleading warnings: The configuration was supplied but isn't a known config
> 
>
> Key: KAFKA-10090
> URL: https://issues.apache.org/jira/browse/KAFKA-10090
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Robert Wruck
>Priority: Major
>
> In our setup (using Spring cloud stream Kafka binder), we see log messages 
> like:
>  
> {{The configuration 'ssl.keystore.password' was supplied but isn't a known 
> config}}
>  
> logged by org.apache.kafka.clients.admin.AdminClientConfig. The Kafka binder 
> actually uses SSL and security.protocol is set to SSL.
> Looking through the code, a few things seem odd:
>  * The log message says "isn't a known config" but that's not true. It is 
> *known*, i.e. defined in ConfigDef, but not *used*.
>  * The method for detecting whether a config is actually *used* is not 
> complete. ChannelBuilders.channelBuilderConfigs() for example extracts the 
> configs to use for the created channel builder using *new 
> HashMap(config.values())* thus *get()* won't mark a config as used anymore.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10106) measure and log time taken to handle LeaderAndIsr request

2020-06-04 Thread NIKHIL (Jira)
NIKHIL created KAFKA-10106:
--

 Summary: measure and log time taken to handle LeaderAndIsr request 
 Key: KAFKA-10106
 URL: https://issues.apache.org/jira/browse/KAFKA-10106
 Project: Kafka
  Issue Type: Improvement
Reporter: NIKHIL


ReplicaManager!becomeLeaderOrFollower handles the LeaderAndIsr request, 
StateChangeLogger logs when this request is handled, however it can be useful 
to log when this calls ends and record the time taken, can help operationally. 

Proposal is to ReplicaManager!becomeLeaderOrFollower start measuring the time 
before the `replicaStateChangeLock` is acquired and log before the response is 
returned. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-06-04 Thread GitBox


mimaison commented on pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#issuecomment-638836545


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-06-04 Thread GitBox


mimaison commented on a change in pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#discussion_r435243287



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -570,6 +592,27 @@ public void testDeleteTopics() throws Exception {
 }
 }
 
+@Test
+public void testDeleteTopicsPartialResponse() throws Exception {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+env.kafkaClient().prepareResponse(body -> body instanceof 
DeleteTopicsRequest,
+prepareDeleteTopicsResponse("myTopic", Errors.NONE));
+Map> values = 
env.adminClient().deleteTopics(asList("myTopic", "myOtherTopic"),
+new DeleteTopicsOptions()).values();
+values.get("myTopic").get();
+
+try {

Review comment:
   We can use `TestUtils.assertFutureThrows()` here too

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3382,6 +3425,90 @@ public void testAlterClientQuotas() throws Exception {
 }
 }
 
+@Test
+public void testAlterReplicaLogDirsSuccess() throws Exception {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+createAlterLogDirsResponse(env, env.cluster().nodeById(0), 
Errors.NONE, 0);
+createAlterLogDirsResponse(env, env.cluster().nodeById(1), 
Errors.NONE, 0);
+
+TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+Map logDirs = new HashMap<>();
+logDirs.put(tpr0, "/data0");
+logDirs.put(tpr1, "/data1");
+AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+assertNull(result.values().get(tpr0).get());
+assertNull(result.values().get(tpr1).get());
+}
+}
+
+@Test
+public void testAlterReplicaLogDirsLogDirNotFound() throws Exception {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+createAlterLogDirsResponse(env, env.cluster().nodeById(0), 
Errors.NONE, 0);
+createAlterLogDirsResponse(env, env.cluster().nodeById(1), 
Errors.LOG_DIR_NOT_FOUND, 0);
+
+TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+Map logDirs = new HashMap<>();
+logDirs.put(tpr0, "/data0");
+logDirs.put(tpr1, "/data1");
+AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+assertNull(result.values().get(tpr0).get());
+TestUtils.assertFutureError(result.values().get(tpr1), 
LogDirNotFoundException.class);
+}
+}
+
+@Test
+public void testAlterReplicaLogDirsUnrequested() throws Exception {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+createAlterLogDirsResponse(env, env.cluster().nodeById(0), 
Errors.NONE, 1, 2);
+
+TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 1, 
0);
+
+Map logDirs = new HashMap<>();
+logDirs.put(tpr1, "/data1");
+AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+assertNull(result.values().get(tpr1).get());
+}
+}
+
+@Test
+public void testAlterReplicaLogDirsPartialResponse() throws Exception {
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+createAlterLogDirsResponse(env, env.cluster().nodeById(0), 
Errors.NONE, 1);
+
+TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 1, 
0);
+TopicPartitionReplica tpr2 = new TopicPartitionReplica("topic", 2, 
0);
+
+Map logDirs = new HashMap<>();
+logDirs.put(tpr1, "/data1");
+logDirs.put(tpr2, "/data1");
+AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+assertNull(result.values().get(tpr1).get());
+try {

Review comment:
   we can use `TestUtils.assertFutureThrows()` here too





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #8644: KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup` (KIP-602)

2020-06-04 Thread GitBox


ijuma merged pull request #8644:
URL: https://github.com/apache/kafka/pull/8644


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8644: KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup` (KIP-602)

2020-06-04 Thread GitBox


ijuma commented on pull request #8644:
URL: https://github.com/apache/kafka/pull/8644#issuecomment-638845776


   Thanks for the contribution! Merged to trunk and 2.6 branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-06-04 Thread GitBox


tombentley commented on pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#issuecomment-638846166


   @mimaison done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10102) Source node references not updated after rebuilding topology

2020-06-04 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10102:
---

 Summary: Source node references not updated after rebuilding 
topology
 Key: KAFKA-10102
 URL: https://issues.apache.org/jira/browse/KAFKA-10102
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
 Fix For: 2.6.0


Luckily this bug was caught by 
RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we 
saw it fail with an NPE during SourceNode#deserializeKey, implying that the key 
deserializer was null which in turns implies that the source node was never 
initialized.

This can happen when a task is updated with new regex matched topic partitions. 
In order to update the topology with the new input partitions, we actually just 
create an entirely new topology from scratch including building new source node 
objects. We then re/initialize this new topology once the task is resumed.

The problem is that the task's RecordQueues save a reference to the 
corresponding source node, and use this to pass polled records into the 
topology. But the RecordQueues aren't updated with the newly built source nodes 
and still point to the original nodes.

If the task had not completed restoration before being updated with new 
partitions, it would never have initialized the original topology or source 
nodes, resulting in an NPE when the RecordQueue passes a record to the old, 
uninitialized source node.

This is the only specific known bug, but I haven't checked the entire code base 
so it's possible there are other node references saved that might result in 
bugs. We should try and avoid rebuilding an entirely new topology if at all 
possible, and see if we can just update the input partitions only where 
necessary



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #8644: KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup` (KIP-602)

2020-06-04 Thread GitBox


ijuma commented on pull request #8644:
URL: https://github.com/apache/kafka/pull/8644#issuecomment-638622720


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on pull request #8107: MINOR: Remove Diamond and code code Alignment

2020-06-04 Thread GitBox


highluck commented on pull request #8107:
URL: https://github.com/apache/kafka/pull/8107#issuecomment-638630457


   @vvcephei 
   code update!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8755: KAFKA-10069: Correctly remove user-defined "predicate" and "negate" configs from transformation properties

2020-06-04 Thread GitBox


tombentley commented on pull request #8755:
URL: https://github.com/apache/kafka/pull/8755#issuecomment-638661645


   Yes, thanks, that was a goot spot @chia7712!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >