[jira] [Updated] (KAFKA-10429) Group Coordinator unavailability leads to missing events

2020-08-24 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-10429:
--
Summary: Group Coordinator unavailability leads to missing events  (was: 
Group Coordinator is unavailable leads to missing events)

> Group Coordinator unavailability leads to missing events
> 
>
> Key: KAFKA-10429
> URL: https://issues.apache.org/jira/browse/KAFKA-10429
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Navinder Brar
>Priority: Major
>
> We are regularly getting this Exception in logs.
> [2020-08-25 03:24:59,214] INFO [Consumer 
> clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group 
> coordinator ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, 
> will attempt rediscovery 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  
> And after sometime it becomes discoverable:
> [2020-08-25 03:25:02,218] INFO [Consumer 
> clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, 
> groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: 
> null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  
> Now, the doubt I have is why this unavailability doesn't trigger a rebalance 
> in the cluster. We have few hours of retention on the source Kafka Topics and 
> sometimes this unavailability stays over for more than few hours and since it 
> doesn't trigger a rebalance or stops processing on other nodes(which are 
> connected to GC) we never come to know that some issue has happened and till 
> then we lose events from our source topics. 
>  
> There are some resolutions mentioned on stackoverflow but those configs are 
> already set in our kafka:
> default.replication.factor=3
> offsets.topic.replication.factor=3
>  
> It would be great to understand why this issue is happening and why it 
> doesn't trigger a rebalance and is there any known solution for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ryannedolan commented on a change in pull request #9215: KAFKA-10133: MM2 readme update on config

2020-08-24 Thread GitBox


ryannedolan commented on a change in pull request #9215:
URL: https://github.com/apache/kafka/pull/9215#discussion_r476170655



##
File path: connect/mirror/README.md
##
@@ -141,7 +141,38 @@ nearby clusters.
 N.B. that the `--clusters` parameter is not technically required here. MM2 
will work fine without it; however, throughput may suffer from "producer lag" 
between
 data centers, and you may incur unnecessary data transfer costs.
 
-## Shared configuration
+## Configuration
+### General Kafka Connect Config
+All Kafka Connect, Source Connector, Sink Connector configs, as defined in 
[Kafka official doc] (https://kafka.apache.org/documentation/#connectconfigs), 
can be 
+directly used in MM2 configuration without prefix in the configuration name. 
As the starting point, most of these configs may work well with the exception 
of `tasks.max`.

Review comment:
   I think you are referring to the default values of the Connect 
configuration properties -- maybe use "default" in this sentence to make that 
more clear.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10429) Group Coordinator is unavailable leads to missing events

2020-08-24 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-10429:
-

 Summary: Group Coordinator is unavailable leads to missing events
 Key: KAFKA-10429
 URL: https://issues.apache.org/jira/browse/KAFKA-10429
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.1
Reporter: Navinder Brar


We are regularly getting this Exception in logs.

[2020-08-25 03:24:59,214] INFO [Consumer 
clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group coordinator 
ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, will attempt 
rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

 

And after sometime it becomes discoverable:

[2020-08-25 03:25:02,218] INFO [Consumer 
clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, 
groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

 

Now, the doubt I have is why this unavailability doesn't trigger a rebalance in 
the cluster. We have few hours of retention on the source Kafka Topics and 
sometimes this unavailability stays over for more than few hours and since it 
doesn't trigger a rebalance or stops processing on other nodes(which are 
connected to GC) we never come to know that some issue has happened and till 
then we lose events from our source topics. 

 

There are some resolutions mentioned on stackoverflow but those configs are 
already set in our kafka:

default.replication.factor=3

offsets.topic.replication.factor=3

 

It would be great to understand why this issue is happening and why it doesn't 
trigger a rebalance and is there any known solution for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] LMnet commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-24 Thread GitBox


LMnet commented on pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#issuecomment-679502482


   @vvcephei I completely forgot about tests, thanks for reminding me! I 
changed all usages of the old `org.apache.kafka.streams.scala.Serdes` to the 
new `org.apache.kafka.streams.scala.serialization.Serdes` in tests. I think it 
should be enough in terms of test coverage.
   
   Also, I fixed `KGroupedStream`, `SessionWindowedKStream`, 
`TimeWindowedKStream`, and docs: they used the old `Serdes` object.
   
   I was not sure about commits policy in Kafka so I added all these changes in 
the separate commit. I can squash it later if needed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-24 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-10424:
--

Assignee: Ning Zhang

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Assignee: Ning Zhang
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] LMnet commented on a change in pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-24 Thread GitBox


LMnet commented on a change in pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#discussion_r476102121



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
##
@@ -24,6 +24,7 @@ import java.util
 import org.apache.kafka.common.serialization.{Deserializer, Serde, Serdes => 
JSerdes, Serializer}
 import org.apache.kafka.streams.kstream.WindowedSerdes
 
+@deprecated("Use org.apache.kafka.streams.scala.serialization.Serdes")

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx opened a new pull request #9218: MINOR: Fix shouldNotResetEpochHistoryHeadIfUndefinedPassed

2020-08-24 Thread GitBox


huxihx opened a new pull request #9218:
URL: https://github.com/apache/kafka/pull/9218


   In LeaderEpochFileCacheTest.scala, code is identical for 
`shouldNotResetEpochHistoryHeadIfUndefinedPassed` and 
`shouldNotResetEpochHistoryTailIfUndefinedPassed`. Seems `truncateFromStart` 
should be invoked in `shouldNotResetEpochHistoryHeadIfUndefinedPassed` instead 
of `truncateFromEnd`.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9038: KAFKA-10134: Check heartbeat timeout for poll fetches [DO NOT MERGE]

2020-08-24 Thread GitBox


guozhangwang commented on pull request #9038:
URL: https://github.com/apache/kafka/pull/9038#issuecomment-679463606


   Closing as it will be subsumed by https://github.com/apache/kafka/pull/8834



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang closed pull request #9038: KAFKA-10134: Check heartbeat timeout for poll fetches [DO NOT MERGE]

2020-08-24 Thread GitBox


guozhangwang closed pull request #9038:
URL: https://github.com/apache/kafka/pull/9038


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9217: MINOR: fix JavaDoc

2020-08-24 Thread GitBox


mjsax commented on a change in pull request #9217:
URL: https://github.com/apache/kafka/pull/9217#discussion_r476026230



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##
@@ -106,7 +105,7 @@ void register(final StateStore store,
 
 /**
  * Schedules a periodic operation for processors. A processor may call 
this method during
- * {@link 
Processor#init(org.apache.kafka.streams.processor.ProcessorContext) 
initialization} or
+ * {@link Processor#init(ProcessorContext) initialization} or

Review comment:
   Removing the package name to point to 
`org.apache.kafka.streams.processor.api.ProcessorContext` (note the `api` 
sub-package in the path)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9217: MINOR: fix JavaDoc

2020-08-24 Thread GitBox


mjsax opened a new pull request #9217:
URL: https://github.com/apache/kafka/pull/9217


   Call for review @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9217: MINOR: fix JavaDoc

2020-08-24 Thread GitBox


mjsax commented on a change in pull request #9217:
URL: https://github.com/apache/kafka/pull/9217#discussion_r476025687



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
##
@@ -17,11 +17,12 @@
 package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
 
 /**
- * Indicates that Kafka Streams is in state {@link KafkaStreams.State#CREATED 
CREATED} and thus state stores cannot be queries yet.
+ * Indicates that Kafka Streams is in state {@link State CREATED} and thus 
state stores cannot be queries yet.

Review comment:
   We can only link to the enum, but not its values.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once

2020-08-24 Thread GitBox


huxihx commented on pull request #8984:
URL: https://github.com/apache/kafka/pull/8984#issuecomment-679438909


   @abbccdda Hi, do you have any time to review this patch? It's been quite a 
long time since no one had a look at it :(



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-08-24 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183654#comment-17183654
 ] 

Guozhang Wang commented on KAFKA-10134:
---

[~zhowei] Thanks for the new log files, it has been very helpful for me to nail 
down the root causes and I will refine an existing WIP PR 
https://github.com/apache/kafka/pull/8834 as a final fix for this. Please stay 
tuned.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer3.log.2020-08-20.log, 
> consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-08-24 Thread GitBox


guozhangwang merged pull request #9094:
URL: https://github.com/apache/kafka/pull/9094


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r476003479



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final 
long windowStartTimes
 @Deprecated
 @Override
 public WindowStoreIterator fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+return fetch(key, timeFrom, timeTo, true);
+}
+
+@Override
+public WindowStoreIterator backwardFetch(final Bytes key, final 
Instant from, final Instant to) {
+final long timeFrom = ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+final long timeTo = ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));

Review comment:
   Ok, feel free to just create a followup ticket to see if we can clean 
things up. No need to block this PR on it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r476001166



##
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##
@@ -119,15 +118,16 @@
  * 
  * This iterator must be closed after use.
  *
- * @param from  the first key in the range
- * @param tothe last key in the range
- * @param timeFrom  time range start (inclusive)
- * @param timeTotime range end (inclusive)
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive)
+ * @param timeTo   time range end (inclusive)
  * @return an iterator over windowed key-value pairs {@code , 
value>}
  * @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if one of the given keys is {@code null}
+ * @throws NullPointerException   if one of the given keys is {@code 
null}
  */
-@SuppressWarnings("deprecation") // note, this method must be kept if 
super#fetch(...) is removed
+// note, this method must be kept if super#fetch(...) is removed
+@SuppressWarnings("deprecation")
 KeyValueIterator, V> fetch(K from, K to, long timeFrom, long 
timeTo);

Review comment:
   I guess the `note, this method must be kept if super#fetch(...) is 
removed` comments are making me nervous, but they could be out of date. Anyways 
I don't think you need to clean all this up right now, just wondering what's 
going on here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r475999351



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -419,13 +504,13 @@ Long minTime() {
 }

Review comment:
   I agree that it doesn't matter much, but I think it has to be "reverse" 
for both time and keys for correctness due to CachingWindowStore. The caching 
layer just puts everything into one byte buffer so when we go in reverse order 
it's just the opposite of forward, which means both key and time ordering is 
flipped. And we unfortunately need the ordering to match up between the cache 
and the underlying store due to that merging iterator guy





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-24 Thread GitBox


jeqo commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r475910689



##
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##
@@ -119,15 +118,16 @@
  * 
  * This iterator must be closed after use.
  *
- * @param from  the first key in the range
- * @param tothe last key in the range
- * @param timeFrom  time range start (inclusive)
- * @param timeTotime range end (inclusive)
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive)
+ * @param timeTo   time range end (inclusive)
  * @return an iterator over windowed key-value pairs {@code , 
value>}
  * @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException if one of the given keys is {@code null}
+ * @throws NullPointerException   if one of the given keys is {@code 
null}
  */
-@SuppressWarnings("deprecation") // note, this method must be kept if 
super#fetch(...) is removed
+// note, this method must be kept if super#fetch(...) is removed
+@SuppressWarnings("deprecation")
 KeyValueIterator, V> fetch(K from, K to, long timeFrom, long 
timeTo);

Review comment:
   These methods were introduced when adding Duration/Instant support 
https://github.com/apache/kafka/pull/5682.
   
   I don't think these are needed, we can do a similar change as for 
SessionStore read operations. wdyt?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -419,13 +504,13 @@ Long minTime() {
 }

Review comment:
   For windowStore, only time-based index is been iterated backward. The 
KIP didn't considered reversing key/value stores internally. 
   
   We would need another flag (apart from backward) to define order of internal 
keys, which its cumbersome, and the order between keys doesn't matter much or 
can be calculated by the user. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
 setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());

Review comment:
   Will have to double check this. I have inverted the current/last segment 
for backwards use-case though.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -72,22 +86,40 @@
 searchSpace.iterator(),

Review comment:
   `searchSpace` will be reversed based on the `forward` flag, on 
`AbstractSegments`. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final 
long windowStartTimes
 @Deprecated
 @Override
 public WindowStoreIterator fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+return fetch(key, timeFrom, timeTo, true);
+}
+
+@Override
+public WindowStoreIterator backwardFetch(final Bytes key, final 
Instant from, final Instant to) {
+final long timeFrom = ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+final long timeTo = ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));

Review comment:
   Only backward compatibility. If it make sense to remove these 
deprecations as part of this KIP, I'd be happy to help cleaning it. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##
@@ -337,25 +462,32 @@ public synchronized void close() {
 
 private CacheIteratorWrapper(final Bytes key,
  final long timeFrom,
- final long timeTo) {
-this(key, key, timeFrom, timeTo);
+ final long timeTo,
+ final boolean forward) {
+this(key, key, timeFrom, timeTo, forward);
 }
 
 private CacheIteratorWrapper(final Bytes keyFrom,
  final Bytes keyTo,
  final long timeFrom,
- final long timeTo) {
+ final long timeTo,
+ final boolean forward) {
 this.keyFrom = keyFrom;
 this.keyTo = keyTo;
 this.timeTo = timeTo;
 this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+this.forward = forward;
 
 this.segmentInterval = 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r475985073



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -328,6 +328,68 @@ public void testAggregateLargeInput() {
 );
 }
 
+@Test
+public void testEarlyRecords() {

Review comment:
   Can we add maybe one or two more tests? I think at the least we should 
have one test that processes _only_ early records, and one test that covers 
input(s) with the same timestamp.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean rightWinAlreadyCreated = false;
 
 // keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+KeyValue, ValueAndTimestamp> latestLeftTypeWindow 
= null;
 try (
 final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(

Review comment:
   We need to make sure the `fetch` bounds don't go into the negative. We 
only call `processEarly` if the record's timestamp is within the 
timeDifferenceMs, but here we search starting at timestamp - 2*timeDifferenceMs
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final HashSet windowStartTimes = new HashSet();
+
+try (
+final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(
+key,
+key,
+timestamp - 2 * windows.timeDifferenceMs(),
+// to catch the current record's right window, if 
it exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long endTime = startTime + 
windows.timeDifferenceMs();
+
+if (endTime == windows.timeDifferenceMs()) {
+combinedWindow = next;
+} else if (endTime > timestamp && startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else {

Review comment:
   It took me a second to get this -- can we explicitly check `if startTime 
== timestamp + 1` instead of falling back to `else` and implicitly relying on 
the fetch bounds? You can just get rid of the `else` altogether or throw an 
IllegalStateException if none of the specific conditions are met and the else 
is reached, whatever makes sense to you

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final HashSet windowStartTimes = new HashSet();
+
+try (
+final KeyValueIterator, 

[GitHub] [kafka] jthompson6 opened a new pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-08-24 Thread GitBox


jthompson6 opened a new pull request #9216:
URL: https://github.com/apache/kafka/pull/9216


   The addBytes method adds the header using Schema.BYTES, which results in 
base64 encoding when the record is stored.  
SimpleHeaderConverter#toConnectHeader implements schema inference which we can 
use here, as the ConsumerRecord does not have header schema information.
   
   Testing: I added schema verification to the existing 
MirrorSourceTaskTest#testSerde, with a string and int example.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9215: KAFKA-10133: MM2 readme update on config

2020-08-24 Thread GitBox


ning2008wisc commented on pull request #9215:
URL: https://github.com/apache/kafka/pull/9215#issuecomment-679420821


   Hi @ryannedolan @mimaison just in case I could borrow few minutes from you 
to take a quick review. Thanks 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8834: MINOR: Do not disable heartbeat during Rebalance

2020-08-24 Thread GitBox


guozhangwang commented on pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#issuecomment-679419336


   @ableegoldman I'm re-targeting this against 
https://issues.apache.org/jira/browse/KAFKA-10134 now as I found the root cause 
of that ticket is actually because we do not send (and hence reset timer of) 
heartbeat during rebalance, hopefully I will have this PR in a final-reviewable 
manner for merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10428) Mirror Maker connect applies base64 encoding to string headers

2020-08-24 Thread Jennifer Thompson (Jira)
Jennifer Thompson created KAFKA-10428:
-

 Summary: Mirror Maker connect applies base64 encoding to string 
headers
 Key: KAFKA-10428
 URL: https://issues.apache.org/jira/browse/KAFKA-10428
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.6.0, 2.5.0, 2.4.0
Reporter: Jennifer Thompson


MirrorSourceTask takes the header value as bytes from the ConsumerRecord, which 
does not have a header schema, and adds it to the SourceRecord headers using 
"addBytes". This uses Schema.BYTES as the schema for the header, and somehow, 
base64 encoding gets applied when the record gets committed.

This means that my original header value "with_headers" (created with a python 
producer, and stored as a 12 character byte array) becomes the string value 
"d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 encoded 
version of the original. If I try to preempt this using "d2l0aF9oZWFkZXJz" to 
start with, and base64 encoding the headers everywhere, it just gets double 
encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing through the 
MirrorSourceTask.

I think the base64 encoding may be coming from Values#append 
(https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674),
 but I'm not sure how. That is invoked by 
SimpleConnectorHeader#fromConnectHeader via Values#convertToString.

SimpleHeaderConverter#toConnectHeader produces the correct schema in this case, 
and solves the problem for me, but it seems to guess at the schema, so I'm not 
sure if it is the right solution. Since schemas seem to be required for 
SourceRecord headers, but not available from ConsumerRecord headers, I'm not 
sure what other option we have. I will open a PR with this solution



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-08-24 Thread GitBox


guozhangwang commented on pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#issuecomment-679408267


   LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-24 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183630#comment-17183630
 ] 

Sophie Blee-Goldman commented on KAFKA-10357:
-

Cool. I agree, KAFKA-3370 would (and still will) be nice to have, but we can 
make some quick & easy progress on the data-loss problem with the 
initialize+config proposal. And since this approach means detecting the 
repartition deletion during a rebalance, we can just leverage the existing 
assignment error code to handle 3) in a similar fashion to KAFKA-10355 / KIP-662

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-24 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183626#comment-17183626
 ] 

Guozhang Wang commented on KAFKA-10357:
---

Theoretically, I think today there's no perfect solution to 2) above since in 
the extreme case, one can, a) delete topic, b) re-create topic, and c) re-fill 
the topic to make it has the same offsets as before, together in between of two 
consecutive consumer fetches, in which case consumer would never detect there's 
an issue happened. But on the other hand, I also agree with [~ableegoldman] 
that this is not the primary scenario that we want to guard against anyways and 
if people really go wild to make that procedure it is out of Kafka's processing 
guarantees today. Our focus should be just 1) and to avoid us (Streams) 
re-creating the topics.

Given that, I think at the moment #initialize plus an internal config to 
disable auto-internal-topic-creation (by default we would still enable it for 
compatibility) would be the easiest way to tackle 1), and it pushes the 
responsibility to users that they need to:

* Ideally, only pick one instance of their streams app to call initialize when 
starting their app for the first time --- note, if a query is "reset" then 
restarting is the same as starting for the first time.
* Set the internal config to disable auto-internal-topic-creation.

KAFKA-3370 can be helpful for both 1) and 2) but again it is not "perfect", so 
if we would have to eventually push it to users, then we'd better do it early 
than later.

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-08-24 Thread GitBox


ableegoldman commented on pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#issuecomment-679399444


   @guozhangwang can we finally merge this?  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-08-24 Thread GitBox


badaiaqrandista commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-679398619


   @bbejeck 
   
   It failed because it print the partition number as a single integer after 
the value. I moved the partition to be before the key (if printed) and value, 
and also added prefix "Partition:" to differentiate it from "Offset:".
   
   It's only printed if "print.partition=true".
   
   I will keep the tests as is then. ConsoleConsumerTest is a unit test for the 
class while DefaultMessageFormatterTest is more of an integration test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-24 Thread GitBox


ableegoldman commented on pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#issuecomment-679394512


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r475919413



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class KStreamSlidingWindowAggregateTest {
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private final String threadId = Thread.currentThread().getName();
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInput() {
+final StreamsBuilder builder = new StreamsBuilder();
+final String topic = "topic";
+
+final KTable, String> table = builder
+.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+.aggregate(
+MockInitializer.STRING_INIT,
+MockAggregator.TOSTRING_ADDER,
+Materialized.>as("topic-Canonized").withValueSerde(Serdes.String())
+);
+final MockProcessorSupplier, String> supplier = new 
MockProcessorSupplier<>();
+table.toStream().process(supplier);
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+final TestInputTopic inputTopic =
+driver.createInputTopic(topic, new StringSerializer(), new 
StringSerializer());
+inputTopic.pipeInput("A", "1", 10L);
+inputTopic.pipeInput("A", "2", 15L);
+inputTopic.pipeInput("A", "3", 20L);
+

[jira] [Updated] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10133:
---
Fix Version/s: 2.7.0

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Assignee: Ning Zhang
>Priority: Minor
> Fix For: 2.7.0
>
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc opened a new pull request #9215: KAFKA-10133: MM2 readme update on config

2020-08-24 Thread GitBox


ning2008wisc opened a new pull request #9215:
URL: https://github.com/apache/kafka/pull/9215


   Per https://issues.apache.org/jira/browse/KAFKA-10133, MM2 users sometimes 
confuse on specifying or overriding the default configurations at different 
levels: connector, MM2, producer/consumers. It may be great if we clarify how 
to correctly set those configs in README.md, in addition to the example config 
file.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

2020-08-24 Thread GitBox


ableegoldman commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-679373369


   @mjsax Be my guest  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

2020-08-24 Thread GitBox


mjsax commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-679347362


   Might it be worth to cherry-pick to `2.6` ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9214: [DO NOT MERGE] POC: unify all jvm cache pools

2020-08-24 Thread GitBox


vvcephei opened a new pull request #9214:
URL: https://github.com/apache/kafka/pull/9214


   Currently, users of Suppress in strict mode must either not configure a 
memory bound or consider a per-operator, per-partition bound. The former would 
result in the application crashing ungracefully if it needs too much memory, 
which is sub-optimal for some deployment strategies. The latter is nice for 
determinism, but is difficult to configure in practice.
   
   In addition to suppress buffers, Streams has a record cache configuration. 
Currently, we make the assumption that all threads would probably use a uniform 
amount of cache space, but this assumption is clearly wrong in some cases.
   
   Finally, there are some applications that want to run multiple Streams 
instances in the same JVM, probably for running different Streams topologies.
   
   In aggregate, there are quite a few "pools" of heap space that users need to 
configure if they want to avoid an OOME, and the more threads, applications, 
and Suppress operators there are, the more granular these pools become. Of 
course, the more granular the pools are, the lower utilization of the available 
memory we will see. Plus, especially for Suppression, very granular pool 
configuration means a higher likelihood that the operator will run out of space 
and shut the app down.
   
   This POC demonstrates the feasibility of unifying all these pools with one 
logical bound on total memory usage for all caches and suppression buffers, 
across all operators/tasks and all threads, and even across all Streams 
instances in the JVM.
   
   Most of the tests pass right now, but not all of them. I also need to clean 
up a few more things before really starting a discussion.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9112: KAFKA-10312 Fix error code returned by getPartitionMetadata

2020-08-24 Thread GitBox


hachikuji merged pull request #9112:
URL: https://github.com/apache/kafka/pull/9112


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475800482



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join
+// we do join iff the joining keys are equal, thus, if the mappedKey 
is null we cannot join
+// and just ignore the record.
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-if (key == null || value == null) {
+final Optional maybeMappedKey = maybeExtractMappedKey(key, value);
+if (!maybeMappedKey.isPresent()) {
 LOG.warn(
 "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
 key, value, context().topic(), context().partition(), 
context().offset()
 );
 droppedRecordsSensor.record();
 } else {
-final K2 mappedKey = keyMapper.apply(key, value);
-final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey));
+final K2 mappedKey = maybeMappedKey.get();
+final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
 if (leftJoin || value2 != null) {
 context().forward(key, joiner.apply(value, value2));
 }
 }
 }
 
+private Optional maybeExtractMappedKey(final K1 key, final V1 value) {
+if (value == null) {
+return Optional.empty();
+}
+
+// we allow the case where the key is null but mappedKey is not null 
and thus
+// we need to guard against nullPointerExceptions. This may happen for 
GlobalKTables.
+// For KTables, the keyMapper simply returns the key, so this will 
never happen
+Optional maybeMappedKey;
+try {
+maybeMappedKey = Optional.ofNullable(keyMapper.apply(key, value));
+} catch (final NullPointerException e) {

Review comment:
   Yeah the compatibility argument is reasonable, but you could say that 
users should have been handling the null case all along and it was just due to 
a bug in Streams that we never actually passed in a null key. If people 
_aren't_ handling null, we should try and alert them quickly (and nothing 
catches people's attention faster than an NPE)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7740) Kafka Admin Client should be able to manage user/client configurations for users and clients

2020-08-24 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183512#comment-17183512
 ] 

Travis Bischel commented on KAFKA-7740:
---

Oh cool, that's great. I thought that aspect was completely removed, but I 
didn't know there was a different blocker. Sorry for the ping!

> Kafka Admin Client should be able to manage user/client configurations for 
> users and clients
> 
>
> Key: KAFKA-7740
> URL: https://issues.apache.org/jira/browse/KAFKA-7740
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
> Environment: linux
>Reporter: Yaodong Yang
>Assignee: Brian Byrne
>Priority: Major
>  Labels: features
> Fix For: 2.6.0
>
>
> Right now, Kafka Admin Client only allow users to change the configuration of 
> brokers and topics. There are some use cases that users want to setOrUpdate 
> quota configurations for users and clients through Kafka Admin Client. 
> Without this new capability, users have to manually talk to zookeeper for 
> this, which will pose other challenges for customers.
> Considering we have already have the framework for the much complex brokers 
> and topic configuration changes, it seems straightforward to add the support 
> for the alterConfig and describeConfig for users and clients as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-08-24 Thread GitBox


guozhangwang commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r475792671



##
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
 assertThat(
 topology.stateStores().size(),
-equalTo(1));
+equalTo(2));

Review comment:
   `enableSendingOldValues` is indicated for sending a pair of  
values, and if it is not set the `old` value would always be null. If we want 
to `enableSendingOldValues` then we'd have to materialize the source node still?
   
   Maybe I'm not fully understanding the context here. Could you bring me up to 
date?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475792745



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join
+// we do join iff the joining keys are equal, thus, if the mappedKey 
is null we cannot join
+// and just ignore the record.
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-if (key == null || value == null) {
+final Optional maybeMappedKey = maybeExtractMappedKey(key, value);
+if (!maybeMappedKey.isPresent()) {
 LOG.warn(
 "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
 key, value, context().topic(), context().partition(), 
context().offset()
 );
 droppedRecordsSensor.record();
 } else {
-final K2 mappedKey = keyMapper.apply(key, value);
-final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey));
+final K2 mappedKey = maybeMappedKey.get();
+final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
 if (leftJoin || value2 != null) {
 context().forward(key, joiner.apply(value, value2));
 }
 }
 }
 
+private Optional maybeExtractMappedKey(final K1 key, final V1 value) {
+if (value == null) {
+return Optional.empty();

Review comment:
   This seems a little subtle. Can we just return the actual mapped key (or 
`.empty()`) in this method, and keep the explicit null check for `value` up 
above?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475791126



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
 @Override
 public void process(final K1 key, final V1 value) {
-// we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-// If {@code keyMapper} returns {@code null} it implies there is no 
match,
-// so ignore unless it is a left join
+// we do join iff the joining keys are equal, thus, if the mappedKey 
is null we cannot join
+// and just ignore the record.
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-if (key == null || value == null) {
+final Optional maybeMappedKey = maybeExtractMappedKey(key, value);
+if (!maybeMappedKey.isPresent()) {
 LOG.warn(
 "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
 key, value, context().topic(), context().partition(), 
context().offset()
 );
 droppedRecordsSensor.record();
 } else {
-final K2 mappedKey = keyMapper.apply(key, value);
-final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey));

Review comment:
   Oh yeah, duh. Nevermind this  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10409) Refactor Kafka Streams RocksDb iterators

2020-08-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10409:
-
Labels: newbie  (was: )

> Refactor Kafka Streams RocksDb iterators 
> -
>
> Key: KAFKA-10409
> URL: https://issues.apache.org/jira/browse/KAFKA-10409
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Minor
>  Labels: newbie
>
> From [https://github.com/apache/kafka/pull/9137#discussion_r470345513] :
> [~ableegoldman] : 
> > Kind of unrelated, but WDYT about renaming {{RocksDBDualCFIterator}} to 
> > {{RocksDBDualCFAllIterator}} or something on the side? I feel like these 
> > iterators could be cleaned up a bit in general to be more understandable – 
> > for example, it's weird that we do the {{iterator#seek}}-ing in the actual 
> > {{all()}} method but for range queries we do the seeking inside the 
> > iterator constructor.
> and [https://github.com/apache/kafka/pull/9137#discussion_r470361726] :
> > Personally I found the {{RocksDBDualCFIterator}} logic a bit difficult to 
> > follow even before the reverse iteration, so it would be nice to have some 
> > tests specifically covering reverse iterators over multi-column-family 
> > timestamped stores



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-08-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10417:
-
Fix Version/s: 3.0.0
   2.8.0

> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Priority: Blocker
>  Labels: kafka-streams
> Fix For: 3.0.0, 2.8.0
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10427) Implement FetchSnapshot RPC

2020-08-24 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-10427:
--

 Summary: Implement FetchSnapshot RPC
 Key: KAFKA-10427
 URL: https://issues.apache.org/jira/browse/KAFKA-10427
 Project: Kafka
  Issue Type: Sub-task
  Components: core, replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-08-24 Thread GitBox


bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-679258872


   > I should just delete the test for DefaultMessageFormatter in 
ConsoleConsumerTest.scala.
   
   @badaiaqrandista
   
   I'm not sure.  My vote would be to keep the test but move it over to the 
`DefaultMessageFormatterTest.scala` class.  
   
   But I'm not familiar enough with this code to say for sure.  From a quick 
look at the old test, it's not clear to me why it failed.  I guess the 
`Partition number` gets printed by default now?
   
   \cc @dajac 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475771230



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via
+ * describe-all).  The future will not complete successfully if the user 
is not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period.
+ */
+public KafkaFuture> users() {
+return usersFuture;
+}
+
+/**
+ *
+ * @param userName the name of the user description being requested
+ * @return a future indicating the description results for the given user. 
The future will complete exceptionally if
+ * the future returned by {@link #users()} completes exceptionally.  If 
the given user does not exist in the list
+ * of requested users then the future will complete exceptionally with
+ * {@link org.apache.kafka.common.errors.ResourceNotFoundException}.
+ */
+public KafkaFuture description(String 
userName) {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(usersFuture);
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+   

[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475771230



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via
+ * describe-all).  The future will not complete successfully if the user 
is not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period.
+ */
+public KafkaFuture> users() {
+return usersFuture;
+}
+
+/**
+ *
+ * @param userName the name of the user description being requested
+ * @return a future indicating the description results for the given user. 
The future will complete exceptionally if
+ * the future returned by {@link #users()} completes exceptionally.  If 
the given user does not exist in the list
+ * of requested users then the future will complete exceptionally with
+ * {@link org.apache.kafka.common.errors.ResourceNotFoundException}.
+ */
+public KafkaFuture description(String 
userName) {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(usersFuture);
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+   

[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-24 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-679256967


   @chia7712 : Sorry for the late response. I just realized there seems to be 
another issue in addition to the above one that I mentioned. The second issue 
is that we hold a group lock while calling 
`joinPurgatory.tryCompleteElseWatch`. In this call, it's possible that 
DelayedJoin.onComplete() will be called. In that case, since the caller holds 
the group lock, we won't be completing partitionsToComplete in 
completeDelayedRequests().



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-24 Thread GitBox


rajinisivaram commented on pull request #9200:
URL: https://github.com/apache/kafka/pull/9200#issuecomment-679249066


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reopened KAFKA-10133:


There is no bug in the code, but need some efforts on doc to clarify on where 
some frequently used configs should be set.

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Assignee: Ning Zhang
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang reassigned KAFKA-10133:
--

Assignee: Ning Zhang

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Assignee: Ning Zhang
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475756645



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via

Review comment:
   We've gone back and forth on this.  The KIP does not explicitly state 
what to do in the case of a describe request for a user that does not have 
credentials, and we originally coded it to silently drop them, but then we 
changed it to be consistent with other APIs and raise an error 
(https://github.com/apache/kafka/pull/9032#discussion_r468871453).  I agree 
that it isn't totally clear what to do.  Rather than making the change back, 
I'll leave both this Javadoc and the underlying implementation as-is right now 
unill we discuss further and decide for sure what we want.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Steve Jacobs (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183445#comment-17183445
 ] 

Steve Jacobs commented on KAFKA-10133:
--

That would be incredibly helpful. Just an example of where this setting goes 
would save a lot of trouble!

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet opened a new pull request #9213: MINOR: add epoch lineage checks to system tests

2020-08-24 Thread GitBox


lbradstreet opened a new pull request #9213:
URL: https://github.com/apache/kafka/pull/9213


   This adds assertions to check that leader epoch lineages match between
   replicas. These have been added to system tests that involve broker
   restarts and that wait for replicas to rejoin the ISR by the end of the
   test.
   
   I also moved wait_until_rejoin_isr from downgrade_test and upgrade_test
as the implementation was the same in each.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475750058



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via
+ * describe-all).  The future will not complete successfully if the user 
is not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period.
+ */
+public KafkaFuture> users() {
+return usersFuture;
+}
+
+/**
+ *
+ * @param userName the name of the user description being requested
+ * @return a future indicating the description results for the given user. 
The future will complete exceptionally if
+ * the future returned by {@link #users()} completes exceptionally.  If 
the given user does not exist in the list
+ * of requested users then the future will complete exceptionally with
+ * {@link org.apache.kafka.common.errors.ResourceNotFoundException}.
+ */
+public KafkaFuture description(String 
userName) {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(usersFuture);
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+   

[GitHub] [kafka] chia7712 opened a new pull request #9212: MINOR: don't keep reference of receive buffer when the value of head …

2020-08-24 Thread GitBox


chia7712 opened a new pull request #9212:
URL: https://github.com/apache/kafka/pull/9212


   Users, who try to avoid null variable, can replace null by empty array to 
build header to be a kind of "flag". The ```RecordHeader#value()``` may be 
never called (as users only want to check existence of header) and so the 
receive buffer can't be released by GC until users get rid of ```Header```.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10426) Deadlock in KafkaConfigBackingStore

2020-08-24 Thread Goltseva Taisiia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Goltseva Taisiia updated KAFKA-10426:
-
Reviewer: Konstantine Karantasis  (was: Chris Egerton)

> Deadlock in KafkaConfigBackingStore
> ---
>
> Key: KAFKA-10426
> URL: https://issues.apache.org/jira/browse/KAFKA-10426
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.1, 2.6.0
>Reporter: Goltseva Taisiia
>Assignee: Goltseva Taisiia
>Priority: Critical
>  Labels: pull-request-available
>
> Hi, guys!
> We faced the following deadlock:
>  
> {code:java}
> KafkaBasedLog Work Thread - _streaming_service_config
> priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId 
> (decimal):2384 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586)
> - waiting to lock <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707)
> - locked <0xd8c3be40> (a java.lang.Object)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
> at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337)
> CustomDistributedHerder-connect-1
> priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId 
> (decimal):2362 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285)
> - waiting to lock <0xd8c3be40> (a java.lang.Object)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514)
> - locked <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){code}
> DistributedHerder went to updateConfigsWithIncrementalCooperative() 
> synchronized method and called configBackingStore.snapshot() which take a 
> lock on internal object in KafkaConfigBackingStore class.
>  
> Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized 
> block on internal object got SESSION_KEY record and called 
> updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
>  
> As I can see the problem is here:
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737]
>  
> As I understand this call should be performed outside synchronized block:
> {code:java}
> if (started)
>
> updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code}
>  
> I'm going to make a PR.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10426) Deadlock in KafkaConfigBackingStore

2020-08-24 Thread Goltseva Taisiia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Goltseva Taisiia updated KAFKA-10426:
-
Reviewer: Chris Egerton  (was: Konstantine Karantasis)

> Deadlock in KafkaConfigBackingStore
> ---
>
> Key: KAFKA-10426
> URL: https://issues.apache.org/jira/browse/KAFKA-10426
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.1, 2.6.0
>Reporter: Goltseva Taisiia
>Assignee: Goltseva Taisiia
>Priority: Critical
>  Labels: pull-request-available
>
> Hi, guys!
> We faced the following deadlock:
>  
> {code:java}
> KafkaBasedLog Work Thread - _streaming_service_config
> priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId 
> (decimal):2384 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586)
> - waiting to lock <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707)
> - locked <0xd8c3be40> (a java.lang.Object)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
> at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71)
> at 
> org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337)
> CustomDistributedHerder-connect-1
> priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId 
> (decimal):2362 - state:BLOCKED
> stackTrace:
> java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285)
> - waiting to lock <0xd8c3be40> (a java.lang.Object)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514)
> - locked <0xe6136808> (a 
> com.company.streaming.platform.kafka.DistributedHerder)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402)
> at 
> com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){code}
> DistributedHerder went to updateConfigsWithIncrementalCooperative() 
> synchronized method and called configBackingStore.snapshot() which take a 
> lock on internal object in KafkaConfigBackingStore class.
>  
> Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized 
> block on internal object got SESSION_KEY record and called 
> updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
>  
> As I can see the problem is here:
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737]
>  
> As I understand this call should be performed outside synchronized block:
> {code:java}
> if (started)
>
> updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code}
>  
> I'm going to make a PR.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-08-24 Thread GitBox


cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r475725389



##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -316,7 +316,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 kafkaController = new KafkaController(config, zkClient, time, metrics, 
brokerInfo, brokerEpoch, tokenManager, threadNamePrefix)
 kafkaController.startup()
 
-brokerToControllerChannelManager = new 
BrokerToControllerChannelManager(metadataCache, time, metrics, config, 
threadNamePrefix)
+if (config.redirectionEnabled) {
+  brokerToControllerChannelManager = new 
BrokerToControllerChannelManager(metadataCache, time, metrics, config, 
threadNamePrefix)

Review comment:
   just as a note the alter isr PR may also have an object like this.  so 
maybe we want a name which is more specific to redirection.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183401#comment-17183401
 ] 

Ning Zhang commented on KAFKA-10133:


Great. Then I will probably make a pr to clarify some use cases like this. 
Thanks

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-08-24 Thread GitBox


cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r475714230



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
##
@@ -87,6 +87,16 @@ public Builder(Map configs, boolean 
validateOnly) {
 public AlterConfigsRequest build(short version) {
 return new AlterConfigsRequest(data, version);
 }
+

Review comment:
   In general we don't define equals or hashCode on these builders.  Why 
are we defining it here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-08-24 Thread GitBox


cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r475712573



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##
@@ -325,7 +326,9 @@
 UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be 
cleared.", UnstableOffsetCommitException::new),
 THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", 
ThrottlingQuotaExceededException::new),
 PRODUCER_FENCED(90, "There is a newer producer with the same 
transactionalId " +
-"which fences the current one.", ProducerFencedException::new);
+"which fences the current one.", ProducerFencedException::new),
+BROKER_AUTHORIZATION_FAILURE(91, "Authorization failed for the request 
during forwarding. " +

Review comment:
   How about: "A broker failed to authorize itself to another component of 
the system.  This indicates an internal error on the broker cluster security 
setup".
   
   This isn't specific to forwarding... there might be other reasons why a 
broker would need to authorize itself and fail





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-08-24 Thread GitBox


cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r475710400



##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationFailureException.java
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception used to indicate a broker side authorization failure during 
request redirection.
+ */
+public class BrokerAuthorizationFailureException extends 
AuthorizationException {
+

Review comment:
   Need to include:
   ```
   private static final long serialVersionUID = 1L;
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


cmccabe commented on pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#issuecomment-679199653


   @rajinisivaram : I'd like to understand your suggestion to forbid 
authenticating via delegation token here.  It doesn't seem consistent with how 
we handle delegation tokens in general, so I might be missing something.  It 
seems like a lot of administrative systems may use delegation tokens and this 
would make this API not useful for them.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475698813



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via

Review comment:
   I think it would be better to make this "a future indicating the users 
that were listed" (rather than "requested").  It's maybe a bit of a subtle 
distinction but think about things like requesting the null user, or the empty 
string user.  It's awkward to put that here.  I think if we explicitly request 
a user but it doesn't exist, it should be omitted from here as well.  That 
gives us more flexibility in the future with the API.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475696554



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via
+ * describe-all).  The future will not complete successfully if the user 
is not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period.
+ */
+public KafkaFuture> users() {
+return usersFuture;
+}
+
+/**
+ *
+ * @param userName the name of the user description being requested
+ * @return a future indicating the description results for the given user. 
The future will complete exceptionally if
+ * the future returned by {@link #users()} completes exceptionally.  If 
the given user does not exist in the list
+ * of requested users then the future will complete exceptionally with
+ * {@link org.apache.kafka.common.errors.ResourceNotFoundException}.
+ */
+public KafkaFuture description(String 
userName) {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(usersFuture);
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+

[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-24 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r475695750



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ResourceNotFoundException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> usersFuture;
+private final Map> 
perUserFutures;
+
+/**
+ *
+ * @param usersFuture the future indicating the users described by the call
+ * @param perUserFutures the required map of user names to futures 
representing the results of describing each
+ *   user's SCRAM credentials.
+ */
+public DescribeUserScramCredentialsResult(KafkaFuture> 
usersFuture,
+  Map> perUserFutures) {
+this.usersFuture = Objects.requireNonNull(usersFuture);
+this.perUserFutures = Objects.requireNonNull(perUserFutures);
+}
+
+/**
+ *
+ * @return a future for the results of all requested (either explicitly or 
implicitly via describe-all) users.
+ * The future will complete successfully only if the users future first 
completes successfully and then all the
+ * futures for the user descriptions complete successfully.
+ */
+public KafkaFuture> all() {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(users());
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = 
KafkaFuture.allOf(perUserFutures.values().toArray(
+new KafkaFuture[perUserFutures.size()]));
+KafkaFuture> 
mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 ->
+perUserFutures.entrySet().stream().collect(Collectors.toMap(
+e -> e.getKey(),
+e -> 
valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue();
+/* At this point it is only the users future that is guaranteed to 
have succeeded.
+ * We want to return the future to the map, but we have to return 
a map at this point.
+ * We need to dereference the future while propagating any 
exception.
+ */
+return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture);
+});
+}
+
+/**
+ *
+ * @return a future indicating the distinct users that were requested 
(either explicitly or implicitly via
+ * describe-all).  The future will not complete successfully if the user 
is not authorized to perform the describe
+ * operation; otherwise, it will complete successfully as long as the list 
of users with credentials can be
+ * successfully determined within some hard-coded timeout period.
+ */
+public KafkaFuture> users() {
+return usersFuture;
+}
+
+/**
+ *
+ * @param userName the name of the user description being requested
+ * @return a future indicating the description results for the given user. 
The future will complete exceptionally if
+ * the future returned by {@link #users()} completes exceptionally.  If 
the given user does not exist in the list
+ * of requested users then the future will complete exceptionally with
+ * {@link org.apache.kafka.common.errors.ResourceNotFoundException}.
+ */
+public KafkaFuture description(String 
userName) {
+KafkaFuture succeedsOnlyIfUsersFutureSucceeds = 
KafkaFuture.allOf(usersFuture);
+return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> {
+

[GitHub] [kafka] cmccabe commented on pull request #9194: KAFKA-10384: Separate converters from generated messages

2020-08-24 Thread GitBox


cmccabe commented on pull request #9194:
URL: https://github.com/apache/kafka/pull/9194#issuecomment-679187796


   > Ok, just to make sure I understand [...] Now for a given schema, we will 
generate SomeMessageData as well as SomeMessageJsonConverter
   
   Right, you got it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7740) Kafka Admin Client should be able to manage user/client configurations for users and clients

2020-08-24 Thread Brian Byrne (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183338#comment-17183338
 ] 

Brian Byrne commented on KAFKA-7740:


Hello - resolve is still planned for a future release, however it got hung up 
around an interaction with the `ClientQuotaCallback`. A KIP to address that 
issue will be required. I can update the original KIP to indicate the resolve 
functionality is not yet available.

> Kafka Admin Client should be able to manage user/client configurations for 
> users and clients
> 
>
> Key: KAFKA-7740
> URL: https://issues.apache.org/jira/browse/KAFKA-7740
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
> Environment: linux
>Reporter: Yaodong Yang
>Assignee: Brian Byrne
>Priority: Major
>  Labels: features
> Fix For: 2.6.0
>
>
> Right now, Kafka Admin Client only allow users to change the configuration of 
> brokers and topics. There are some use cases that users want to setOrUpdate 
> quota configurations for users and clients through Kafka Admin Client. 
> Without this new capability, users have to manually talk to zookeeper for 
> this, which will pose other challenges for customers.
> Considering we have already have the framework for the much complex brokers 
> and topic configuration changes, it seems straightforward to add the support 
> for the alterConfig and describeConfig for users and clients as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-08-24 Thread Flavien Raynaud (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183304#comment-17183304
 ] 

Flavien Raynaud commented on KAFKA-8733:


Has there been any update regarding this issue/the associated KIP? I can see 
that the thread on the mailing list has been empty for the past 6 months. It 
has happened again recently when one broker ecountered a disk failure, causing 
a bunch of offline partitions. Happy to help in any way we can 

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475626573



##
File path: config/log4j.properties
##
@@ -61,8 +61,8 @@ 
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 log4j.logger.org.apache.zookeeper=INFO
 
 # Change the two lines below to adjust the general broker logging level 
(output to server.log and stdout)
-log4j.logger.kafka=INFO
-log4j.logger.org.apache.kafka=INFO
+log4j.logger.kafka=DEBUG

Review comment:
   Need to revert this stuff, didn't mean to commit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xakassi opened a new pull request #9211: KAFKA-10426: Deadlock on session key update.

2020-08-24 Thread GitBox


xakassi opened a new pull request #9211:
URL: https://github.com/apache/kafka/pull/9211


   DistributedHerder goes to updateConfigsWithIncrementalCooperative() 
synchronized method and called configBackingStore.snapshot() which take a lock 
on internal object in KafkaConfigBackingStore class.
   
   Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized 
block on internal object gets SESSION_KEY record and calls 
updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
   
   So, we have a Deadlock.
   
   To avoid this, updateListener with new session key should be called outside 
synchronized block as it's done, for example, for 
updateListener.onTaskConfigUpdate(updatedTasks).
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475632719



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1257,4 +1364,4 @@ class Partition(val topicPartition: TopicPartition,
 }
 partitionString.toString
   }
-}
+}

Review comment:
   newline





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475625111



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##
@@ -325,7 +326,8 @@
 UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be 
cleared.", UnstableOffsetCommitException::new),
 THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", 
ThrottlingQuotaExceededException::new),
 PRODUCER_FENCED(90, "There is a newer producer with the same 
transactionalId " +
-"which fences the current one.", ProducerFencedException::new);
+"which fences the current one.", ProducerFencedException::new),
+INVALID_UPDATE_VERSION(91, "The given ISR version was out-of-date.", 
InvalidUpdateVersionException::new);

Review comment:
   This error message should be less specific





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475630695



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -806,8 +840,9 @@ class Partition(val topicPartition: TopicPartition,
   // avoid unnecessary collection generation
   var newHighWatermark = leaderLog.logEndOffsetMetadata
   remoteReplicasMap.values.foreach { replica =>
+// Note here we are using effectiveInSyncReplicaIds, see explanation 
above

Review comment:
   Fix comment to refer to correct variable





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475629593



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -635,7 +666,7 @@ class Partition(val topicPartition: TopicPartition,
 
 // check if we need to expand ISR to include this replica
 // if it is not in the ISR yet

Review comment:
   Expand on this comment to discuss the maximal ISR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-24 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r475627290



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -141,7 +142,8 @@ object Partition extends KafkaMetricsGroup {
   stateStore = zkIsrBackingStore,
   delayedOperations = delayedOperations,
   metadataCache = replicaManager.metadataCache,
-  logManager = replicaManager.logManager)
+  logManager = replicaManager.logManager,
+  alterIsrChannelManager = replicaManager.alterIsrManager)

Review comment:
   Rename to alterIsrManager





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Steve Jacobs (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183300#comment-17183300
 ] 

Steve Jacobs commented on KAFKA-10133:
--

That is how I was confirming compression as well. The example you gave is the 
same as what I said above. It sets the property on the kafka connect worker.  
Not in the sourceconnector/checkpointconnector etc. 

> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-24 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-10414.
--
Resolution: Not A Problem

api-util is only a test dependency, not an issue.

> Upgrade api-util dependency - CVE-2018-1337
> ---
>
> Key: KAFKA-10414
> URL: https://issues.apache.org/jira/browse/KAFKA-10414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
> involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=
> This is a transitive dependency through the apacheds libs.
> -Can be fixed by upgrading to at least version 2.0.0.AM25-
> Since api-all is also a dependency, and there is a class collision between 
> api-all and newer version of api-util, it is better to just upgrade api-util 
> to 1.0.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted

2020-08-24 Thread Ilia Pasynkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183104#comment-17183104
 ] 

Ilia Pasynkov commented on KAFKA-10362:
---

[~high.lee] Hello, yes I'm working on this task)

> When resuming Streams active task with EOS, the checkpoint file should be 
> deleted
> -
>
> Key: KAFKA-10362
> URL: https://issues.apache.org/jira/browse/KAFKA-10362
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> Today when we suspend a task we commit and along with the commit we always 
> write checkpoint file even if we are eosEnabled (since the state is already 
> SUSPENDED). But the suspended task may later be resumed and in that case the 
> checkpoint file should be deleted since it should only be written when it is 
> cleanly closed.
> With our latest rebalance protocol in KIP-429, resume would not be called 
> since all suspended tasks would be closed, but with the old eager protocol it 
> may still be called — I think that may be the reason we did not get it often.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted

2020-08-24 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183075#comment-17183075
 ] 

highluck commented on KAFKA-10362:
--

[~ipasynkov] 
Are you working on it? If not, can I do PR?

> When resuming Streams active task with EOS, the checkpoint file should be 
> deleted
> -
>
> Key: KAFKA-10362
> URL: https://issues.apache.org/jira/browse/KAFKA-10362
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> Today when we suspend a task we commit and along with the commit we always 
> write checkpoint file even if we are eosEnabled (since the state is already 
> SUSPENDED). But the suspended task may later be resumed and in that case the 
> checkpoint file should be deleted since it should only be written when it is 
> cleanly closed.
> With our latest rebalance protocol in KIP-429, resume would not be called 
> since all suspended tasks would be closed, but with the old eager protocol it 
> may still be called — I think that may be the reason we did not get it often.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10426) Deadlock in KafkaConfigBackingStore

2020-08-24 Thread Goltseva Taisiia (Jira)
Goltseva Taisiia created KAFKA-10426:


 Summary: Deadlock in KafkaConfigBackingStore
 Key: KAFKA-10426
 URL: https://issues.apache.org/jira/browse/KAFKA-10426
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.6.0, 2.4.1
Reporter: Goltseva Taisiia


Hi, guys!

We faced the following deadlock:

 
{code:java}
KafkaBasedLog Work Thread - _streaming_service_config
priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId 
(decimal):2384 - state:BLOCKED
stackTrace:
java.lang.Thread.State: BLOCKED (on object monitor)
at 
com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586)
- waiting to lock <0xe6136808> (a 
com.company.streaming.platform.kafka.DistributedHerder)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707)
- locked <0xd8c3be40> (a java.lang.Object)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71)
at 
org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337)



CustomDistributedHerder-connect-1
priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId 
(decimal):2362 - state:BLOCKED
stackTrace:
java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285)
- waiting to lock <0xd8c3be40> (a java.lang.Object)
at 
com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514)
- locked <0xe6136808> (a 
com.company.streaming.platform.kafka.DistributedHerder)
at 
com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402)
at 
com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){code}

DistributedHerder went to updateConfigsWithIncrementalCooperative() 
synchronized method and called configBackingStore.snapshot() which take a lock 
on internal object in KafkaConfigBackingStore class.

 

Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block 
on internal object got SESSION_KEY record and called 
updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.

 

As I can see the problem is here:

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737]

 

As I understand this call should be performed outside synchronized block:
{code:java}
if (started)
   
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code}
 

I'm going to make a PR.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10402) Upgrade python version in system tests

2020-08-24 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183038#comment-17183038
 ] 

Nikolay Izhikov commented on KAFKA-10402:
-

Fails:
{noformat}

test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery.processing_guarantee=exactly_once_beta
status: FAIL
run time:   9 minutes 59.410 seconds


Never saw output 'StateChange: REBALANCING -> RUNNING' on ducker@ducker09
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 
134, in run
data = self.run_test()
  File 
"/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 
192, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 
429, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
91, in test_failure_and_recovery
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
110, in run_failure_and_recovery
self.add_streams(processor1)
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
133, in add_streams
self.wait_for_startup(monitor, processor)
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
168, in wait_for_startup
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
174, in wait_for
err_msg=("Never saw output '%s' on " % output) + 
str(processor.node.account))
  File 
"/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", 
line 708, in wait_until
allow_fail=True) == 0, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/ducktape/utils/util.py", line 
41, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
ducktape.errors.TimeoutError: Never saw output 'StateChange: REBALANCING -> 
RUNNING' on ducker@ducker09


test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once
status: FAIL
run time:   10 minutes 15.336 seconds


Never saw output 'StateChange: REBALANCING -> RUNNING' on ducker@ducker05
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 
134, in run
data = self.run_test()
  File 
"/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 
192, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 
429, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
100, in test_failure_and_recovery_complex
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
110, in run_failure_and_recovery
self.add_streams(processor1)
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
133, in add_streams
self.wait_for_startup(monitor, processor)
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
168, in wait_for_startup
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 
174, in wait_for
err_msg=("Never saw output '%s' on " % output) + 
str(processor.node.account))
  File 
"/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", 
line 708, in wait_until
allow_fail=True) == 0, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/ducktape/utils/util.py", line 
41, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
ducktape.errors.TimeoutError: Never saw output 'StateChange: REBALANCING -> 
RUNNING' on ducker@ducker05




test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=False.connect_protocol=compatible
status: FAIL
run time:   6 minutes 8.566 seconds


Found validation errors:
Not enough messages were processed: source:0 sink:0
  Not enough messages 

[jira] [Comment Edited] (KAFKA-10402) Upgrade python version in system tests

2020-08-24 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183036#comment-17183036
 ] 

Nikolay Izhikov edited comment on KAFKA-10402 at 8/24/20, 8:05 AM:
---

Tests results:

Full report in  [^report.txt] 

{noformat}

SESSION REPORT (ALL TESTS)
ducktape version: 0.8.0
session_id:   2020-08-23--002
run time: 1010 minutes 46.483 seconds
tests run:684
passed:   505
failed:   9
ignored:  170
{noformat}


was (Author: nizhikov):
Tests results:

{noformat}

SESSION REPORT (ALL TESTS)
ducktape version: 0.8.0
session_id:   2020-08-23--002
run time: 1010 minutes 46.483 seconds
tests run:684
passed:   505
failed:   9
ignored:  170
{noformat}

> Upgrade python version in system tests
> --
>
> Key: KAFKA-10402
> URL: https://issues.apache.org/jira/browse/KAFKA-10402
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Major
> Attachments: report.txt
>
>
> Currently, system tests using python 2 which is outdated and not supported.
> Since all dependency of system tests including ducktape supporting python 3 
> we can migrate system tests to python3.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10402) Upgrade python version in system tests

2020-08-24 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183036#comment-17183036
 ] 

Nikolay Izhikov commented on KAFKA-10402:
-

Tests results:

{noformat}

SESSION REPORT (ALL TESTS)
ducktape version: 0.8.0
session_id:   2020-08-23--002
run time: 1010 minutes 46.483 seconds
tests run:684
passed:   505
failed:   9
ignored:  170
{noformat}

> Upgrade python version in system tests
> --
>
> Key: KAFKA-10402
> URL: https://issues.apache.org/jira/browse/KAFKA-10402
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Major
> Attachments: report.txt
>
>
> Currently, system tests using python 2 which is outdated and not supported.
> Since all dependency of system tests including ducktape supporting python 3 
> we can migrate system tests to python3.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10402) Upgrade python version in system tests

2020-08-24 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov updated KAFKA-10402:

Attachment: report.txt

> Upgrade python version in system tests
> --
>
> Key: KAFKA-10402
> URL: https://issues.apache.org/jira/browse/KAFKA-10402
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Major
> Attachments: report.txt
>
>
> Currently, system tests using python 2 which is outdated and not supported.
> Since all dependency of system tests including ducktape supporting python 3 
> we can migrate system tests to python3.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10425) Documentation switches to a random page on clicking the left navigation bar hide/expand button

2020-08-24 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183034#comment-17183034
 ] 

Luke Chen commented on KAFKA-10425:
---

I'm having the same issue, too.

 

> Documentation switches to a random page on clicking the left navigation bar 
> hide/expand button
> --
>
> Key: KAFKA-10425
> URL: https://issues.apache.org/jira/browse/KAFKA-10425
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Sanjay Ravikumar
>Priority: Minor
>  Labels: documentation
>
> The Kafka documentation includes a button to hide or expand left navigation 
> bar. On clicking that button, the documentation switches to a random page. 
> For example, while I'm on 
> [https://kafka.apache.org/documentation.html#hwandos|https://kafka.apache.org/25/documentation.html#hwandos],
>  if I click that hide button, the page switches to some page further down in 
> the documentation. Similarly, if I'm on a certain page with left navigation 
> bar hidden, and when I click that button, the page switches  to a page 
> further up in the documentation.
> This might be happening due to page resizing when that button is clicked. The 
> issue is present in both 2.6.0 and 2.5.1 versions. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   It's because we we will write data to the .tmp file first and then swap 
to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with 
`ATOMIC_MOVE` option, which will try to replace the target file if exists. I 
cannot create `IOException` with this case. I added comments for this line to 
explain the reason. Thank you.
   ref: 
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...-





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   It's because we we will write data to the .tmp file first and then swap 
to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with 
`ATOMIC_MOVE` option, which will try to replace the target file if exists. I 
cannot create `IOException` case with this case. I added comments for this line 
to explain the reason. Thank you.
   ref: 
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...-





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon edited a comment on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-678959902


   @mjsax , I've updated in this commit: 
https://github.com/apache/kafka/pull/9121/commits/12d3826a87a5b21033e2f81c6a486353e79d8591.
 Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-678959902


   @mjsax , I've updated in this commit: 
https://github.com/apache/kafka/pull/9121/commits/b248ccc4f35ce6bb7ef865c65392710c1558ca20.
 Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   It's because we we will write data to the .tmp file first and then swap 
to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with 
`ATOMIC_MOVE` option, which will try to replace the target file if exists. I 
added comments for this line to explain the reason. Thank you.
   ref: 
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...-





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-24 Thread Mikhail Grinfeld (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182710#comment-17182710
 ] 

Mikhail Grinfeld edited comment on KAFKA-10424 at 8/24/20, 6:25 AM:


I can see the same behaviour with KTable topics generated by KStreams app :(


was (Author: grinfeld):
I can see the same behaviour with KTable topics generated by KStreams app, but 
in case I create topic with "*kafka-topics.sh --bootstrap-server localhost:9092 
--create --topic my-topic --partitions 1 --replication-factor 1 --config 
max.message.bytes=64000 --config flush.messages=1*" the topic not replicated at 
all. I'll check my configuration, maybe I have some buggy conf in my app :(

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"

2020-08-24 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182977#comment-17182977
 ] 

Ning Zhang commented on KAFKA-10424:


Actually, I did a quick validation on real deployment, and the first impression 
is: 

"cleanup.policy=compact" is not replicated to the remote topic on the target 
cluster.

So it seems a bug to me and need to look into it

> MirrorMaker 2.0 does not replicates topic's "clean.policy"
> --
>
> Key: KAFKA-10424
> URL: https://issues.apache.org/jira/browse/KAFKA-10424
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Mikhail Grinfeld
>Priority: Major
>
> I needed to replicate schema-registry "_schemas" topic. 
> data was replicated successfully and everything looked good, but new 
> schema-registry started with warning that replicated topic's cleanup.policy 
> is not "compact"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   I agree it'll be better to do the assertion after the try-block. But no, 
we can't move the assert out of the try-block because the `appender` is 
declared within try block. We can move the assert out of try-block if we don't 
use the try resource auto-close pattern, but I don't think it would be better. 
   Also, we assert within try-block for the `appender` tests in other places. I 
think they are all for the same reason as I mentioned above. 
   Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   No, we can't move the assert out of the try-block because the `appender` 
is declared within try block. We can move the assert out of try-block if we 
don't use the try resource auto-close pattern, but I don't think it would be 
better. 
   Also, we assert within try-block for the `LogCaptureAppender` in other 
places. I think they are all for the same reason as I mentioned above. 
   Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   I agree it'll be better to do the assertion after the try-block. But no, 
we can't move the assert out of the try-block because the `appender` is 
declared within try block. We can move the assert out of try-block if we don't 
use the try resource auto-close pattern, but I don't think it would be better. 
   Also, we assert within try-block for the `LogCaptureAppender` in other 
places. I think they are all for the same reason as I mentioned above. 
   Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2

2020-08-24 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182976#comment-17182976
 ] 

Ning Zhang commented on KAFKA-10133:


[~steveatbat] My experiment seems to show different conclusion.

In order to preserve the compression type from upstream producer, in MM2 config 
file, I override the producer config, such as,

{code:java}
.producer.compression.type = gzip
{code}

Then run this command on the target cluster to verify:

{code:java}
/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration 
--files /kafka/kafka-logs-kafka2-0/primary.test-0/.log
{code}

Output:

{code:java}
Starting offset: 0
offset: 0 position: 0 CreateTime: 1598249096409 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 1 position: 0 CreateTime: 1598249106432 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
offset: 2 position: 0 CreateTime: 1598249158144 isvalid: true keysize: 4 
valuesize: 6 magic: 2 compresscodec: GZIP producerId: -1 producerEpoch: -1 
sequence: -1 isTransactional: false headerKeys: []
{code}

The output shows that the messages replicated to the target cluster are 
compressed with GZIP.

So I am wondering what exact config you use at kafka connect worker level to 
make compression work



> Cannot compress messages in destination cluster with MM2
> 
>
> Key: KAFKA-10133
> URL: https://issues.apache.org/jira/browse/KAFKA-10133
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Environment: kafka 2.5.0 deployed via the strimzi operator 0.18
>Reporter: Steve Jacobs
>Priority: Minor
>
> When configuring mirrormaker2 using kafka connect, it is not possible to 
> configure things such that messages are compressed in the destination 
> cluster. Dump Log shows that batching is occuring, but no compression. If 
> this is possible, then this is a documentation bug, because I can find no 
> documentation on how to do this.
> baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: 
> -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: 
> false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 
> magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)