[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-12 Thread GitBox


kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631584810



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1204,13 +1202,12 @@ class Log(@volatile private var _dir: File,
 // Do the read on the segment with a base offset less than the target 
offset
 // but if that segment doesn't contain any messages with an offset 
greater than that
 // continue to read from successive segments until we get some 
messages or we reach the end of the log
-var done = segmentEntryOpt.isEmpty
+var done = segmentOpt.isEmpty
 var fetchDataInfo: FetchDataInfo = null
+val segmentsIterator = segmentOpt.map(segment => 
segments.higherSegments(segment.baseOffset))

Review comment:
   Done, made it private. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-12 Thread GitBox


kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631581404



##
File path: core/src/main/scala/kafka/log/LogSegments.scala
##
@@ -185,7 +184,7 @@ class LogSegments(topicPartition: TopicPartition) {
* if it exists.
*/
   @threadsafe
-  def higherEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = 
Option(segments.higherEntry(offset))
+  def higherEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = 
Option(segments.higherEntry(offset))

Review comment:
   That's one reason. The other is that `Map.Entry` provided a better 
interface for attribute lookup via `Map.Entry.getKey()` and 
`Map.Entry.getValue()`, as compared to `._1` and `._2` in a tuple.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-12 Thread GitBox


ijuma commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631564915



##
File path: core/src/main/scala/kafka/log/LogSegments.scala
##
@@ -185,7 +184,7 @@ class LogSegments(topicPartition: TopicPartition) {
* if it exists.
*/
   @threadsafe
-  def higherEntry(offset: Long): Option[Map.Entry[JLong, LogSegment]] = 
Option(segments.higherEntry(offset))
+  def higherEntry(offset: Long): Option[Map.Entry[Long, LogSegment]] = 
Option(segments.higherEntry(offset))

Review comment:
   Why do we use `Map.Entry` instead of tuple here? Is it to avoid the 
allocation?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] romani commented on pull request #10656: MINOR: checkstyle version upgrade: 8.20 -->> 8.36.2

2021-05-12 Thread GitBox


romani commented on pull request #10656:
URL: https://github.com/apache/kafka/pull/10656#issuecomment-840296162


   When you finish build config update to define checkstyle version from CLI, 
please share with us command.
   We will do 
   ```
   git clone
   Git checkout ...
   Your build command
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-12 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631549424



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -16,29 +16,30 @@
  */
 package kafka.raft
 
-import java.io.{File, IOException}
-import java.nio.file.{Files, NoSuchFileException}
-import java.util.concurrent.ConcurrentSkipListSet
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
 import java.util.{Optional, Properties}
 
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
LogDirFailureChannel}
 import kafka.utils.{Logging, Scheduler}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, 
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
+import scala.annotation.nowarn
+import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   log: Log,
   scheduler: Scheduler,
   // This object needs to be thread-safe because it is used by the 
snapshotting thread to notify the
   // polling thread when snapshots are created.
-  snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
+  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],

Review comment:
   No. I updated the comment. I'll push a commit tomorrow after a few other 
changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-12 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631549244



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 }
 
 /**
- * Delete the snapshot from the filesystem, the caller may firstly rename 
snapshot file to
- * ${file}.deleted, so we try to delete the file as well as the renamed 
file if exists.
+ * Delete the snapshot from the filesystem.
  */
-public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
-Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+public static boolean deleteIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
+Path immutablePath = snapshotPath(logDir, snapshotId);
+Path deletedPath = deleteRename(immutablePath, snapshotId);
 try {
-return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletingPath);
+return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletedPath);
 } catch (IOException e) {
-log.error("Error deleting snapshot file " + deletingPath, e);
+log.error("Error deleting snapshot files {} and {}", 
immutablePath, deletedPath, e);
 return false;
 }
 }
 
+/**
+ * Mark a snapshot for deletion by renaming with the deleted suffix
+ */
+public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+Path immutablePath = snapshotPath(logDir, snapshotId);
+Path deletedPath = deleteRename(immutablePath, snapshotId);
+try {
+Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+} catch (IOException e) {
+log.error("Error renaming snapshot file from {} to {}", 
immutablePath, deletedPath, e);

Review comment:
   By changing it to `UncheckedIOExcpetion` this will unwind the stack for 
the polling thread. Tomorrow, I'll look into how we handle that case but it may 
already shutdown the broker and controller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT commented on pull request #10677: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is

2021-05-12 Thread GitBox


DuongPTIT commented on pull request #10677:
URL: https://github.com/apache/kafka/pull/10677#issuecomment-840281912


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-12 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631544872



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 }
 
 /**
- * Delete the snapshot from the filesystem, the caller may firstly rename 
snapshot file to
- * ${file}.deleted, so we try to delete the file as well as the renamed 
file if exists.
+ * Delete the snapshot from the filesystem.
  */
-public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
-Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+public static boolean deleteIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
+Path immutablePath = snapshotPath(logDir, snapshotId);
+Path deletedPath = deleteRename(immutablePath, snapshotId);
 try {
-return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletingPath);
+return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletedPath);
 } catch (IOException e) {
-log.error("Error deleting snapshot file " + deletingPath, e);
+log.error("Error deleting snapshot files {} and {}", 
immutablePath, deletedPath, e);
 return false;
 }
 }
 
+/**
+ * Mark a snapshot for deletion by renaming with the deleted suffix
+ */
+public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+Path immutablePath = snapshotPath(logDir, snapshotId);
+Path deletedPath = deleteRename(immutablePath, snapshotId);
+try {
+Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+} catch (IOException e) {
+log.error("Error renaming snapshot file from {} to {}", 
immutablePath, deletedPath, e);

Review comment:
   @mumrah suggested converting all of the `IOException` to 
`UncheckedIOException`. Kafka doesn't have a precedence of doing that but maybe 
we should do that going forward. I filed 
https://issues.apache.org/jira/browse/KAFKA-12773 but I'll change it here to 
re-throw instead of logging this message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-12 Thread GitBox


ccding commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631539787



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1204,13 +1202,12 @@ class Log(@volatile private var _dir: File,
 // Do the read on the segment with a base offset less than the target 
offset
 // but if that segment doesn't contain any messages with an offset 
greater than that
 // continue to read from successive segments until we get some 
messages or we reach the end of the log
-var done = segmentEntryOpt.isEmpty
+var done = segmentOpt.isEmpty
 var fetchDataInfo: FetchDataInfo = null
+val segmentsIterator = segmentOpt.map(segment => 
segments.higherSegments(segment.baseOffset))

Review comment:
   Given we can get baseOffset from segment, can we get rid of `floorEntry` 
overall? or make `floorEntry` private?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1219,10 +1216,10 @@ class Log(@volatile private var _dir: File,
   fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, 
minOneMessage)
   if (fetchDataInfo != null) {
 if (includeAbortedTxns)
-  fetchDataInfo = addAbortedTransactions(startOffset, 
segmentEntry, fetchDataInfo)
-  } else segmentEntryOpt = segments.higherEntry(baseOffset)
+  fetchDataInfo = addAbortedTransactions(startOffset, segment, 
fetchDataInfo)
+  } else segmentOpt = segmentsIterator.nextOption()
 
-  done = fetchDataInfo != null || segmentEntryOpt.isEmpty
+  done = fetchDataInfo != null || segmentOpt.isEmpty

Review comment:
   can we simply use `while (fetchDataInfo == null && 
segmentOpt.isDefined)` rather than `while (!done)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-12 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631543566



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##
@@ -54,8 +54,12 @@ public Records records() {
 }
 
 @Override
-public void close() throws IOException {
-fileRecords.close();
+public void close() {
+try {
+fileRecords.close();
+} catch (IOException e) {
+throw new RuntimeException(e);

Review comment:
   I am not sure. I could use some guidance here. I read the documentation 
for `KafkaStorageException`: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java#L19-L30.
 It looks like Kafka uses `KafkaStorageException` if the IO is visible to the 
client.
   
   On the server (broker and controller) this code will be called async by the 
same scheduler used for deleting log segments. In that case `CoreUtils.swallow` 
is used which logs a WARN message. I think we should do the same here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-12 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343682#comment-17343682
 ] 

NEERAJ VAIDYA commented on KAFKA-12776:
---

Assuming that a Producer application is started, but there is no Cluster 
available.

If at this point the application gets a number of requests to publish messages 
(all with the same key) to Kafka, then because Kafka Producer does not have any 
meta-data about the topic, it arranges the messages in batches which then might 
not necessarily cause the messages to go to the same partition ?

When a message is retried, does Kafka Producer NOT try to get metadata again, 
before creating the batch ? If that is true, then I can understand why the 
messages are batched in a sequence where the ordering is not preserved because 
it probably does not have meta-data.

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] DuongPTIT commented on pull request #10677: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is

2021-05-12 Thread GitBox


DuongPTIT commented on pull request #10677:
URL: https://github.com/apache/kafka/pull/10677#issuecomment-840264920


   Please retest this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT opened a new pull request #10685: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is

2021-05-12 Thread GitBox


DuongPTIT opened a new pull request #10685:
URL: https://github.com/apache/kafka/pull/10685


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10676: KAFKA-12648: Pt. 0 - Add TopologyMetadata.Subtopology class for subtopology metadata

2021-05-12 Thread GitBox


ableegoldman commented on pull request #10676:
URL: https://github.com/apache/kafka/pull/10676#issuecomment-840227278


   Ready for review @wcarlson5 @guozhangwang @rodesai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-12 Thread GitBox


kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631501285



##
File path: core/src/main/scala/kafka/log/LogSegments.scala
##
@@ -17,7 +17,6 @@
 package kafka.log
 
 import java.io.File
-import java.lang.{Long => JLong}

Review comment:
   cc @ccding 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-12 Thread GitBox


kowshik commented on pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#issuecomment-840215023


   cc @junrao


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-12 Thread GitBox


kowshik commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r631501034



##
File path: core/src/main/scala/kafka/log/LogSegments.scala
##
@@ -210,11 +209,23 @@ class LogSegments(topicPartition: TopicPartition) {
* @return the entry associated with the greatest offset, if it exists.
*/
   @threadsafe
-  def lastEntry: Option[Map.Entry[JLong, LogSegment]] = 
Option(segments.lastEntry)
+  def lastEntry: Option[Map.Entry[Long, LogSegment]] = 
Option(segments.lastEntry)
 
   /**
* @return the log segment with the greatest offset, if it exists.
*/
   @threadsafe
   def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue)
+
+  /**
+   * Returns an iterable to log segments ordered from lowest base offset to 
highest.
+   * Each segment in the returned iterable has a base offset strictly greater 
than the provided baseOffset.
+   */
+  def higherSegments(baseOffset: Long): Iterable[LogSegment] = {

Review comment:
   @ijuma If you think this API is a good fit to the requirement, I can add 
unit tests for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik opened a new pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-12 Thread GitBox


kowshik opened a new pull request #10684:
URL: https://github.com/apache/kafka/pull/10684


   - In `Log.rebuildProducerState()` I've removed usage of the `allSegments` 
local variable.
   - In `Log.collectAbortedTransactions()` I'm now raising an exception when 
the segment can't be found.
   - I've introduced a new `LogSegments.higherEntries()` API that's now used to 
make the logic a bit more readable in `Log.read()`, `Log. 
collectAbortedTransactions()` and `Log.deletableSegments()` APIs.
   
   **Tests:**
   Relying on existing unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840198347


   Short update: 
   - first issue occurs only for Java "1.8.0_292" (not only here but worldwide 
and it is recognized by Oracle)
   - second issue... I did report it to a Gradle, but maybe it should be 
addressed to **_Apache Directory Server_** instead (note: this project is using 
apacheds version **_2.0.0-M24_** and it seems that this particular issue is 
solved in apacheds  **_2.0.0-M26_** (good news is that upgrade is not required: 
good old exclusion should do the trick).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10683: KAFKA-12648: introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

2021-05-12 Thread GitBox


ableegoldman opened a new pull request #10683:
URL: https://github.com/apache/kafka/pull/10683


   Next up after [#10609](https://github.com/apache/kafka/pull/10609)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10666: MINOR: prevent cleanup() from being called while Streams is still shutting down

2021-05-12 Thread GitBox


ableegoldman commented on pull request #10666:
URL: https://github.com/apache/kafka/pull/10666#issuecomment-840173582


   cc @guozhangwang @cadonna @wcarlson5 @lct45 to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6654) Customize SSLContext creation

2021-05-12 Thread Maulin Vasavada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343635#comment-17343635
 ] 

Maulin Vasavada commented on KAFKA-6654:


[~soarez] See the above.

> Customize SSLContext creation
> -
>
> Key: KAFKA-6654
> URL: https://issues.apache.org/jira/browse/KAFKA-6654
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 1.0.0
>Reporter: Robert Wruck
>Priority: Major
>
> Currently, loading of SSL keystore and truststore always uses a 
> FileInputStream (SslFactory.SecurityStore) and cannot be changed to load 
> keystores from other locations such as the classpath, raw byte arrays etc.
> Furthermore, passwords for the key stores have to be provided as plaintext 
> configuration properties.
> Delegating the creation of an SSLContext to a customizable implementation 
> might solve some more issues such as KAFKA-5519, KAFKA-4933, KAFKA-4294, 
> KAFKA-2629 by enabling Kafka users to implement their own.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6654) Customize SSLContext creation

2021-05-12 Thread Maulin Vasavada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343634#comment-17343634
 ] 

Maulin Vasavada commented on KAFKA-6654:


KIP-519 addresses this issue.

> Customize SSLContext creation
> -
>
> Key: KAFKA-6654
> URL: https://issues.apache.org/jira/browse/KAFKA-6654
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 1.0.0
>Reporter: Robert Wruck
>Priority: Major
>
> Currently, loading of SSL keystore and truststore always uses a 
> FileInputStream (SslFactory.SecurityStore) and cannot be changed to load 
> keystores from other locations such as the classpath, raw byte arrays etc.
> Furthermore, passwords for the key stores have to be provided as plaintext 
> configuration properties.
> Delegating the creation of an SSLContext to a customizable implementation 
> might solve some more issues such as KAFKA-5519, KAFKA-4933, KAFKA-4294, 
> KAFKA-2629 by enabling Kafka users to implement their own.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] eas5 opened a new pull request #10682: Refactored test using parameterization

2021-05-12 Thread GitBox


eas5 opened a new pull request #10682:
URL: https://github.com/apache/kafka/pull/10682


   Problem:
   (1) Conditions or full repetitions within a test method may produce a false 
test outcome. Once a condition is not met, and the code block inside the 
condition is not executed, a false 'passed' outcome can be produced without the 
test ever being executed.
   (2) A test method with many individual assertions stops being executed on 
the first failed assertion, which prevents the remaining ones' execution. The 
difference between these assertions lies in different arguments only.
   
   Solution:
   Parameterized tests make it possible to run a test multiple times with 
different arguments. Using JUnit's parameterized tests feature instead of a 
repetition structure enables the repeated block to function as an independent 
test. This way, we could make the original tests become as many independent 
ones as there are versions to be tested. In this refactoring, no original 
assertion parameter was changed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2021-05-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12774:
---
Fix Version/s: 2.8.1
   3.0.0

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
> Fix For: 3.0.0, 2.8.1
>
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> 
> 
>  stacktraceAsString="true" properties="true">
>  value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/>
> 
> 
>  {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
> logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
> 
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2021-05-12 Thread Walker Carlson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343624#comment-17343624
 ] 

Walker Carlson commented on KAFKA-12774:


I looked at the code and it seems that in the replace thread path we don't 
actually log the exception unless its for the global thread, we just rethrow 
the exception to kill the thread. This is what we used to do so I am not sure 
why it is not logging correctly. We do for all other cases. 

The other paths we log the exception with some flavor text, we can add it to 
the replace thread option but probably we just want to add a log at the top of 
the handler with the error.

I suppose that we should also see if the same behavior with the shutdown client 
and application options.

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> 
> 
>  stacktraceAsString="true" properties="true">
>  value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/>
> 
> 
>  {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
> logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
> 
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ryannedolan opened a new pull request #10681: KIP 731 WIP Connect per-task record rate limiting

2021-05-12 Thread GitBox


ryannedolan opened a new pull request #10681:
URL: https://github.com/apache/kafka/pull/10681


   WIP concept for a flexible RateLimiter for Connect. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-12 Thread NEERAJ VAIDYA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NEERAJ VAIDYA updated KAFKA-12776:
--
Description: 
I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 

My application is basically a Spring boot web-application which accepts JSON 
payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
Cloud Stream Kafka in the application to create and use a Producer.

For one of my failure handling test cases, I shutdown the Kafka cluster while 
my applications are running. (Note : No messages have been published to the 
Kafka cluster before I stop the cluster)

When the producer application tries to write messages to TA, it cannot because 
the cluster is down and hence (I assume) buffers the messages. Let's say it 
receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is first and 
m4 is last).

When I bring the Kafka cluster back online, the producer sends the buffered 
messages to the topic, but they are not in order. I receive for example, m2 
then m3 then m1 and then m4.

Why is that ? Is it because the buffering in the producer is multi-threaded 
with each producing to the topic at the same time ?

My project code is attached herewith.

I can confirm that I have enabled idempotence. I have also tried with 
```max.in.flight.requests=1```

  was:
I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). I also have a 
Kafka streams application which consumes from TA and writes to topic-B (TB). In 
the streams application, I have a custom timestamp extractor which extracts the 
timestamp from the message payload.

For one of my failure handling test cases, I shutdown the Kafka cluster while 
my applications are running. (Note : No messages have been published to the 
Kafka cluster before I stop the cluster)

When the producer application tries to write messages to TA, it cannot because 
the cluster is down and hence (I assume) buffers the messages. Let's say it 
receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is first and 
m4 is last).

When I bring the Kafka cluster back online, the producer sends the buffered 
messages to the topic, but they are not in order. I receive for example, m2 
then m3 then m1 and then m4.

Why is that ? Is it because the buffering in the producer is multi-threaded 
with each producing to the topic at the same time ?

My application is basically a Spring boot web-application which accepts JSON 
payloads and then pushes them to a Kafka topic. I also use Spring Cloud Stream 
Kafka within it to create and use a Producer.

My project code is attached herewith.

 


> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10326: Avoid newly replicating brokers in RackAwareReplicaSelector

2021-05-12 Thread GitBox


cmccabe commented on pull request #10326:
URL: https://github.com/apache/kafka/pull/10326#issuecomment-840123466


   Does this need a KIP? cc @ijuma and @hachikuji 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840090090


   Also... both here on Jenkins and on my laptop there is another type of 
problems with some test failing like this: 
   
   ```
   org.apache.directory.api.ldap.schema.extractor.UniqueResourceException: 
Problem locating LDIF file in schema repository
   Multiple copies of resource named 
'schema/ou=schema/cn=apachedns/ou=objectclasses/m-oid=1.3.6.1.4.1.18060.0.4.2.3.9.ldif'
 located on classpath at urls
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-all/1.0.2/5a0f957dfe453ecaa6e7cbb19d2698083116e123/api-all-1.0.2.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-ldap-schema-data/1.0.0/eca194f6baca671c466a4eb3ebc669856d997baa/api-ldap-schema-data-1.0.0.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
   ```
   
   There are some similar problems recorded (although with Maven... but luckily 
I'm a Maven aficionado, too ):
- https://issues.apache.org/jira/browse/DIRSERVER-1606
- https://issues.apache.org/jira/browse/DIRSERVER-2066 
- 
https://stackoverflow.com/questions/17217465/apache-directory-ldap-api-getting-up-and-running
   
   I will investigate this also (could be some regression in Gradle 7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840090090


   Also... both here on Jenkins and on my laptop there is another type of 
problems with some test failing like this: 
   
   ```
   org.apache.directory.api.ldap.schema.extractor.UniqueResourceException: 
Problem locating LDIF file in schema repository
   Multiple copies of resource named 
'schema/ou=schema/cn=apachedns/ou=objectclasses/m-oid=1.3.6.1.4.1.18060.0.4.2.3.9.ldif'
 located on classpath at urls
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-all/1.0.2/5a0f957dfe453ecaa6e7cbb19d2698083116e123/api-all-1.0.2.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-ldap-schema-data/1.0.0/eca194f6baca671c466a4eb3ebc669856d997baa/api-ldap-schema-data-1.0.0.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
   ```
   
   There are some similar problems recorded (with Maven, although);
- https://issues.apache.org/jira/browse/DIRSERVER-1606
- https://issues.apache.org/jira/browse/DIRSERVER-2066 
- 
https://stackoverflow.com/questions/17217465/apache-directory-ldap-api-getting-up-and-running
   
   I will investigate this also (could be some regression in Gradle 7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840090090


   Also... both here on Jenkins and on my laptop there is another type of 
problems with some test failing like this: 
   
   ```
   org.apache.directory.api.ldap.schema.extractor.UniqueResourceException: 
Problem locating LDIF file in schema repository
   Multiple copies of resource named 
'schema/ou=schema/cn=apachedns/ou=objectclasses/m-oid=1.3.6.1.4.1.18060.0.4.2.3.9.ldif'
 located on classpath at urls
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-all/1.0.2/5a0f957dfe453ecaa6e7cbb19d2698083116e123/api-all-1.0.2.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-ldap-schema-data/1.0.0/eca194f6baca671c466a4eb3ebc669856d997baa/api-ldap-schema-data-1.0.0.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
   ```
   
   There are some similar problems recorder (with Maven, although);
- https://issues.apache.org/jira/browse/DIRSERVER-1606
- https://issues.apache.org/jira/browse/DIRSERVER-2066 
- 
https://stackoverflow.com/questions/17217465/apache-directory-ldap-api-getting-up-and-running
   
   I will investigate this also (could be some regression in Gradle 7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343588#comment-17343588
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12775:


If it's any consolation, I just so happen to be working on a way to isolate 
independent parts of the topology to enable certain kinds of upgrades as we 
speak. It's still very much in the experimental phase so no KIP yet, but 
hopefully you won't have to wait too long. Until then, it's probably best to 
play it safe as you said

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dejan2609 commented on pull request #10658: POC for CheckStyle 8.42 regression (with 'Unnecessary Parentheses' errors)

2021-05-12 Thread GitBox


dejan2609 commented on pull request #10658:
URL: https://github.com/apache/kafka/pull/10658#issuecomment-840092184


   Note: comment (related to a different PR) was accidentally submitted here 
and then deleted... sorry for spamming.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840081435


   Note: I am trying to exact JDK version for Github generated Jenkins build: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-10606/3/pipeline/16
 becouse it seems that I have more recent Java 8 version installed.
   
   Test that fail here on Jenkins: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-10606/1/tests
   
   Around 50 test fail _**only**_ on my local box: 
   
   Gradle command: `./gradlew -PscalaVersion=2.12 test`
   
   Environment: 
   ```
   dejan@dejan-HP-ProBook-450-G7:~$ java -version
   openjdk version "1.8.0_292"
   OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~20.04-b10)
   OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
   dejan@dejan-HP-ProBook-450-G7:~$ date
   сре, 12. мај 2021.  22:20:54 CEST
   dejan@dejan-HP-ProBook-450-G7:~$
   ```
   
   All failed test stacktrace starts like this: 
   ```
   org.apache.kafka.common.errors.InvalidConfigurationException: Invalid PEM 
keystore configs
   Caused by: java.security.KeyStoreException: Key protection  algorithm not 
found: java.security.UnrecoverableKeyException: Encrypt Private Key failed: 
unrecognized algorithm name: PBEWithSHA1AndDESede
at 
sun.security.pkcs12.PKCS12KeyStore.setKeyEntry(PKCS12KeyStore.java:677)
at 
sun.security.pkcs12.PKCS12KeyStore.engineSetKeyEntry(PKCS12KeyStore.java:577)
at java.security.KeyStore.setKeyEntry(KeyStore.java:1140)
   ```
   
   It seems that these other people have similar issues: 
   - https://github.com/jenkinsci/docker/issues/1112 
   - https://github.com/bcgit/bc-java/issues/941#issuecomment-836055626
   
   Also, Oracle opened few JIRA related tickets: 
- https://bugs.openjdk.java.net/browse/JDK-8266261 After 8u282 -> 8u292 
update: "unrecognized algorithm name: PBEWithSHA1AndDESede"
- https://bugs.openjdk.java.net/browse/JDK-8266279 8u292 
NoSuchAlgorithmException unrecognized algorithm name: PBEWithSHA1AndDESede
   
   
   
   
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840090090


   Also... both here on Jenkins and on my laptop there is another type of 
problems with some test failing like this: 
   
   ```
   org.apache.directory.api.ldap.schema.extractor.UniqueResourceException: 
Problem locating LDIF file in schema repository
   Multiple copies of resource named 
'schema/ou=schema/cn=apachedns/ou=objectclasses/m-oid=1.3.6.1.4.1.18060.0.4.2.3.9.ldif'
 located on classpath at urls
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-all/1.0.2/5a0f957dfe453ecaa6e7cbb19d2698083116e123/api-all-1.0.2.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-ldap-schema-data/1.0.0/eca194f6baca671c466a4eb3ebc669856d997baa/api-ldap-schema-data-1.0.0.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 removed a comment on pull request #10658: POC for CheckStyle 8.42 regression (with 'Unnecessary Parentheses' errors)

2021-05-12 Thread GitBox


dejan2609 removed a comment on pull request #10658:
URL: https://github.com/apache/kafka/pull/10658#issuecomment-840084223


   Also... both here and on my laptop there is another type of problem with 
some test failing like this: 
   
   ```
   org.apache.directory.api.ldap.schema.extractor.UniqueResourceException: 
Problem locating LDIF file in schema repository
   Multiple copies of resource named 
'schema/ou=schema/cn=apachedns/ou=objectclasses/m-oid=1.3.6.1.4.1.18060.0.4.2.3.9.ldif'
 located on classpath at urls
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-all/1.0.2/5a0f957dfe453ecaa6e7cbb19d2698083116e123/api-all-1.0.2.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-ldap-schema-data/1.0.0/eca194f6baca671c466a4eb3ebc669856d997baa/api-ldap-schema-data-1.0.0.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840081435


   Note: I am trying to exact JDK version for Github generated Jenkins build: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-10606/3/pipeline/16
 becouse it seems that I have more recent Java 8 version installed.
   
   Test that fail here on Jenkins: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-10606/1/tests
   
   Around 50 test fail only on my local box: 
   
   Gradle command: `./gradlew -PscalaVersion=2.12 test`
   
   Environment: 
   ```
   dejan@dejan-HP-ProBook-450-G7:~$ java -version
   openjdk version "1.8.0_292"
   OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~20.04-b10)
   OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
   dejan@dejan-HP-ProBook-450-G7:~$ date
   сре, 12. мај 2021.  22:20:54 CEST
   dejan@dejan-HP-ProBook-450-G7:~$
   ```
   
   All failed test stacktrace starts like this: 
   ```
   org.apache.kafka.common.errors.InvalidConfigurationException: Invalid PEM 
keystore configs
   Caused by: java.security.KeyStoreException: Key protection  algorithm not 
found: java.security.UnrecoverableKeyException: Encrypt Private Key failed: 
unrecognized algorithm name: PBEWithSHA1AndDESede
at 
sun.security.pkcs12.PKCS12KeyStore.setKeyEntry(PKCS12KeyStore.java:677)
at 
sun.security.pkcs12.PKCS12KeyStore.engineSetKeyEntry(PKCS12KeyStore.java:577)
at java.security.KeyStore.setKeyEntry(KeyStore.java:1140)
   ```
   
   It seems that these other people have similar issues: 
   - https://github.com/jenkinsci/docker/issues/1112 
   - https://github.com/bcgit/bc-java/issues/941#issuecomment-836055626
   
   Also, Oracle opened few JIRA related tickets: 
- https://bugs.openjdk.java.net/browse/JDK-8266261 After 8u282 -> 8u292 
update: "unrecognized algorithm name: PBEWithSHA1AndDESede"
- https://bugs.openjdk.java.net/browse/JDK-8266279 8u292 
NoSuchAlgorithmException unrecognized algorithm name: PBEWithSHA1AndDESede
   
   
   
   
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-12 Thread NEERAJ VAIDYA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NEERAJ VAIDYA updated KAFKA-12776:
--
Attachment: mocker.zip

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). I also have 
> a Kafka streams application which consumes from TA and writes to topic-B 
> (TB). In the streams application, I have a custom timestamp extractor which 
> extracts the timestamp from the message payload.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My application is basically a Spring boot web-application which accepts JSON 
> payloads and then pushes them to a Kafka topic. I also use Spring Cloud 
> Stream Kafka within it to create and use a Producer.
> My project code is attached herewith.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343578#comment-17343578
 ] 

Nico Habermann commented on KAFKA-12775:


[~ableegoldman] Thanks for the extra help! My case is a bit different - I'm not 
removing a subtopology / task, I'm adding a new one. The end result is still 
quite the same, an internal defensive check that throws an exception because 
it's seeing something that should not exist. However right now I'm avoiding 
that entire code path by deleting all local state, so your tip of surgically 
deleting only certain Task Dirs (and not the entire thing) may still apply to 
this situation. I'll investigate - thanks!

Over all though it has become quite apparent to me that we're going against 
Kafka Stream's interface and guarantees here and it's probably only a matter of 
time until we apply such a workaround in the wrong way, or a new Kafka Streams 
version may change internal behaviour which we overlook and we somehow corrupt 
our state without noticing. I'll be keeping an eye on those KIPs for topology 
changes though :)

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343578#comment-17343578
 ] 

Nico Habermann edited comment on KAFKA-12775 at 5/12/21, 8:52 PM:
--

[~ableegoldman] Thanks for the extra help! Our case is a bit different - we're 
not removing a subtopology / task, we're adding a new one. The end result is 
still quite the same, an internal defensive check that throws an exception 
because it's seeing something that should not exist. However right now I'm 
avoiding that entire code path by deleting all local state, so your tip of 
surgically deleting only certain Task Dirs (and not the entire thing) may still 
apply to this situation. I'll investigate - thanks!

Over all though it has become quite apparent to me that we're going against 
Kafka Stream's interface and guarantees here and it's probably only a matter of 
time until we apply such a workaround in the wrong way, or a new Kafka Streams 
version may change internal behaviour which we overlook and we somehow corrupt 
our state without noticing. I'll be keeping an eye on those KIPs for topology 
changes though :)


was (Author: nhab):
[~ableegoldman] Thanks for the extra help! My case is a bit different - I'm not 
removing a subtopology / task, I'm adding a new one. The end result is still 
quite the same, an internal defensive check that throws an exception because 
it's seeing something that should not exist. However right now I'm avoiding 
that entire code path by deleting all local state, so your tip of surgically 
deleting only certain Task Dirs (and not the entire thing) may still apply to 
this situation. I'll investigate - thanks!

Over all though it has become quite apparent to me that we're going against 
Kafka Stream's interface and guarantees here and it's probably only a matter of 
time until we apply such a workaround in the wrong way, or a new Kafka Streams 
version may change internal behaviour which we overlook and we somehow corrupt 
our state without noticing. I'll be keeping an eye on those KIPs for topology 
changes though :)

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-12 Thread NEERAJ VAIDYA (Jira)
NEERAJ VAIDYA created KAFKA-12776:
-

 Summary: Producer sends messages out-of-order inspite of enabling 
idempotence
 Key: KAFKA-12776
 URL: https://issues.apache.org/jira/browse/KAFKA-12776
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.7.0, 2.6.0
 Environment: Linux RHEL 7.9 and Ubuntu 20.04
Reporter: NEERAJ VAIDYA


I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). I also have a 
Kafka streams application which consumes from TA and writes to topic-B (TB). In 
the streams application, I have a custom timestamp extractor which extracts the 
timestamp from the message payload.

For one of my failure handling test cases, I shutdown the Kafka cluster while 
my applications are running. (Note : No messages have been published to the 
Kafka cluster before I stop the cluster)

When the producer application tries to write messages to TA, it cannot because 
the cluster is down and hence (I assume) buffers the messages. Let's say it 
receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is first and 
m4 is last).

When I bring the Kafka cluster back online, the producer sends the buffered 
messages to the topic, but they are not in order. I receive for example, m2 
then m3 then m1 and then m4.

Why is that ? Is it because the buffering in the producer is multi-threaded 
with each producing to the topic at the same time ?

My application is basically a Spring boot web-application which accepts JSON 
payloads and then pushes them to a Kafka topic. I also use Spring Cloud Stream 
Kafka within it to create and use a Producer.

My project code is attached herewith.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…

2021-05-12 Thread GitBox


mjsax commented on a change in pull request #10673:
URL: https://github.com/apache/kafka/pull/10673#discussion_r631388964



##
File path: tests/kafkatest/services/streams.py
##
@@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode):
 
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
 processingMode)
 
+def prop_file(self):
+properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  # the old broker (< 2.4) does not support configuration 
replication.factor=-1
+  "replication.factor": 1}

Review comment:
   Fine with me, too. Feel free to merge.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840081435


   So, herewith some update: on my local box (Ubuntu Linux with Java 8) around 
50 test fail in a same way; details: 
   
   Gradle command: `./gradlew -PscalaVersion=2.12 test`
   
   Environment: 
   ```
   dejan@dejan-HP-ProBook-450-G7:~$ java -version
   openjdk version "1.8.0_292"
   OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~20.04-b10)
   OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
   dejan@dejan-HP-ProBook-450-G7:~$ date
   сре, 12. мај 2021.  22:20:54 CEST
   dejan@dejan-HP-ProBook-450-G7:~$
   ```
   
   All failed test stacktrace starts like this: 
   ```
   org.apache.kafka.common.errors.InvalidConfigurationException: Invalid PEM 
keystore configs
   Caused by: java.security.KeyStoreException: Key protection  algorithm not 
found: java.security.UnrecoverableKeyException: Encrypt Private Key failed: 
unrecognized algorithm name: PBEWithSHA1AndDESede
at 
sun.security.pkcs12.PKCS12KeyStore.setKeyEntry(PKCS12KeyStore.java:677)
at 
sun.security.pkcs12.PKCS12KeyStore.engineSetKeyEntry(PKCS12KeyStore.java:577)
at java.security.KeyStore.setKeyEntry(KeyStore.java:1140)
   ```
   
   It seems that these other people have similar issues: 
   - https://github.com/jenkinsci/docker/issues/1112 
   - https://github.com/bcgit/bc-java/issues/941#issuecomment-836055626
   
   Also, Oracle opened few JIRA related tickets: 
- https://bugs.openjdk.java.net/browse/JDK-8266261 After 8u282 -> 8u292 
update: "unrecognized algorithm name: PBEWithSHA1AndDESede"
- https://bugs.openjdk.java.net/browse/JDK-8266279 8u292 
NoSuchAlgorithmException unrecognized algorithm name: PBEWithSHA1AndDESede
   
   
   Note: I am trying to exact JDK version for Github generated Jenkins build: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-10606/3/pipeline/16
 
   
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10658: POC for CheckStyle 8.42 regression (with 'Unnecessary Parentheses' errors)

2021-05-12 Thread GitBox


dejan2609 commented on pull request #10658:
URL: https://github.com/apache/kafka/pull/10658#issuecomment-840084223


   Also... both here and on my laptop there is another type of problem with 
some test failing like this: 
   
   ```
   org.apache.directory.api.ldap.schema.extractor.UniqueResourceException: 
Problem locating LDIF file in schema repository
   Multiple copies of resource named 
'schema/ou=schema/cn=apachedns/ou=objectclasses/m-oid=1.3.6.1.4.1.18060.0.4.2.3.9.ldif'
 located on classpath at urls
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-all/1.0.2/5a0f957dfe453ecaa6e7cbb19d2698083116e123/api-all-1.0.2.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
jarːfileː/home/jenkins/.gradle/caches/modules-2/files-2.1/org.apache.directory.api/api-ldap-schema-data/1.0.0/eca194f6baca671c466a4eb3ebc669856d997baa/api-ldap-schema-data-1.0.0.jar!/schema/ou%3dschema/cn%3dapachedns/ou%3dobjectclasses/m-oid%3d1.3.6.1.4.1.18060.0.4.2.3.9.ldif
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-12 Thread GitBox


junrao commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r631363750



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -16,29 +16,30 @@
  */
 package kafka.raft
 
-import java.io.{File, IOException}
-import java.nio.file.{Files, NoSuchFileException}
-import java.util.concurrent.ConcurrentSkipListSet
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException, Path}
 import java.util.{Optional, Properties}
 
 import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
LogDirFailureChannel}
 import kafka.utils.{Logging, Scheduler}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, 
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
+import scala.annotation.nowarn
+import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   log: Log,
   scheduler: Scheduler,
   // This object needs to be thread-safe because it is used by the 
snapshotting thread to notify the
   // polling thread when snapshots are created.
-  snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
+  snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]],

Review comment:
   Is the above comment still accurate since snapshots is no longer thread 
safe?

##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 }
 
 /**
- * Delete the snapshot from the filesystem, the caller may firstly rename 
snapshot file to
- * ${file}.deleted, so we try to delete the file as well as the renamed 
file if exists.
+ * Delete the snapshot from the filesystem.
  */
-public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
-Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+public static boolean deleteIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
+Path immutablePath = snapshotPath(logDir, snapshotId);
+Path deletedPath = deleteRename(immutablePath, snapshotId);
 try {
-return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletingPath);
+return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletedPath);
 } catch (IOException e) {
-log.error("Error deleting snapshot file " + deletingPath, e);
+log.error("Error deleting snapshot files {} and {}", 
immutablePath, deletedPath, e);
 return false;
 }
 }
 
+/**
+ * Mark a snapshot for deletion by renaming with the deleted suffix
+ */
+public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+Path immutablePath = snapshotPath(logDir, snapshotId);
+Path deletedPath = deleteRename(immutablePath, snapshotId);
+try {
+Utils.atomicMoveWithFallback(immutablePath, deletedPath, false);
+} catch (IOException e) {
+log.error("Error renaming snapshot file from {} to {}", 
immutablePath, deletedPath, e);

Review comment:
   Should we just fail the controller on IOException?

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##
@@ -54,8 +54,12 @@ public Records records() {
 }
 
 @Override
-public void close() throws IOException {
-fileRecords.close();
+public void close() {
+try {
+fileRecords.close();
+} catch (IOException e) {
+throw new RuntimeException(e);

Review comment:
   Should we throw KafkaStorageException?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-840081435


   So, herewith some update: on my local box (Ubuntu Linux with Java 8) around 
50 test fail in a same way; details: 
   
   Gradle command: `./gradlew -PscalaVersion=2.12 test`
   
   Environment: 
   ```
   dejan@dejan-HP-ProBook-450-G7:~$ java -version
   openjdk version "1.8.0_292"
   OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~20.04-b10)
   OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
   dejan@dejan-HP-ProBook-450-G7:~$ date
   сре, 12. мај 2021.  22:20:54 CEST
   dejan@dejan-HP-ProBook-450-G7:~$
   ```
   
   All failed test stacktrace starts like this: 
   ```
   org.apache.kafka.common.errors.InvalidConfigurationException: Invalid PEM 
keystore configs
   Caused by: java.security.KeyStoreException: Key protection  algorithm not 
found: java.security.UnrecoverableKeyException: Encrypt Private Key failed: 
unrecognized algorithm name: PBEWithSHA1AndDESede
at 
sun.security.pkcs12.PKCS12KeyStore.setKeyEntry(PKCS12KeyStore.java:677)
at 
sun.security.pkcs12.PKCS12KeyStore.engineSetKeyEntry(PKCS12KeyStore.java:577)
at java.security.KeyStore.setKeyEntry(KeyStore.java:1140)
   ```
   
   It seems that these other people have similar issues: 
   - https://github.com/jenkinsci/docker/issues/1112 
   - https://github.com/bcgit/bc-java/issues/941#issuecomment-836055626
   
   Also, Oracle opened few JIRA related tickets: 
- https://bugs.openjdk.java.net/browse/JDK-8266261 After 8u282 -> 8u292 
update: "unrecognized algorithm name: PBEWithSHA1AndDESede"
- https://bugs.openjdk.java.net/browse/JDK-8266279 8u292 
NoSuchAlgorithmException unrecognized algorithm name: PBEWithSHA1AndDESede
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding opened a new pull request #10680: Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created

2021-05-12 Thread GitBox


ccding opened a new pull request #10680:
URL: https://github.com/apache/kafka/pull/10680


   Kafka does not call fsync() on the directory when a new log segment is 
created and flushed to disk.
   
   The problem is that following sequence of calls doesn't guarantee file 
durability:
   
   fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log"
   write(fd);
   fsync(fd);
   
   If the system crashes after fsync() but before the parent directory has been 
flushed to disk, the log file can disappear.
   
   This PR is to flush the directory when flush() is called for the first time.
   
   Did performance test which shows this PR has a minimal performance impact on 
Kafka clusters.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343508#comment-17343508
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12775:


[~nhab] while Bruno is correct that topological changes are never guaranteed to 
be compatible, if you are 100% certain that a change _is_ backwards compatible 
then you can still work around. Instead of fully resetting the app, or even 
clearing out the entire local state, you can just delete the task directories 
corresponding to the subtopologies that no longer exist. This does require a 
somewhat deeper understanding of Kafka Streams, but imo this seems acceptable, 
as only more advanced users are likely to be able to analyze a change to 
determine whether it's really going to be compatible or not. 

Imo it's also a good idea to force users to manually clean up the parts of the 
topology which don't exist any more. If you don't, then this could corrupt your 
application later if in the future you decide to make another 
backwards-compatible change which adds a new subtopology. If you didn't clean 
up after the old one, and eventually forgot about it, then adding a new 
subtopology would not actually be compatible (until you do clean it up).

Note that to really clean it up, you'd need to also delete any internal topics 
(eg changelogs) that correspond to the removed subtopology

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…

2021-05-12 Thread GitBox


ableegoldman commented on a change in pull request #10673:
URL: https://github.com/apache/kafka/pull/10673#discussion_r631340308



##
File path: tests/kafkatest/services/streams.py
##
@@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode):
 
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
 processingMode)
 
+def prop_file(self):
+properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  # the old broker (< 2.4) does not support configuration 
replication.factor=-1
+  "replication.factor": 1}

Review comment:
   Fine with me either way. Thanks for the fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r631339580



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer implements Deserializer> {
+
+private static final Map>, Integer> 
FIXED_LENGTH_DESERIALIZERS = mkMap(
+mkEntry(ShortDeserializer.class, Short.BYTES),
+mkEntry(IntegerDeserializer.class, Integer.BYTES),
+mkEntry(FloatDeserializer.class, Float.BYTES),
+mkEntry(LongDeserializer.class, Long.BYTES),
+mkEntry(DoubleDeserializer.class, Double.BYTES),
+mkEntry(UUIDDeserializer.class, 36)
+);
+
+private Deserializer inner;
+private Class listClass;
+private Integer primitiveSize;
+
+public ListDeserializer() {}
+
+public > ListDeserializer(Class listClass, 
Deserializer inner) {
+if (listClass == null || inner == null) {
+throw new IllegalArgumentException("ListDeserializer requires both 
\"listClass\" and \"innerDeserializer\" parameters to be provided during 
initialization");
+}
+this.listClass = listClass;
+this.inner = inner;
+this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass());
+}
+
+public Deserializer getInnerDeserializer() {
+return inner;
+}
+
+@Override
+public void configure(Map configs, boolean isKey) {
+if (listClass != null || inner != null) {
+throw new ConfigException("List deserializer was already 
initialized using a non-default constructor");
+}
+configureListClass(configs, isKey);
+configureInnerSerde(configs, isKey);
+}
+
+private void configureListClass(Map configs, boolean isKey) {
+String listTypePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS;
+final Object listClassOrName = configs.get(listTypePropertyName);
+if (listClassOrName == null) {
+throw new ConfigException("Not able to determine the list class 
because it was neither passed via the constructor nor set in the config.");
+}
+try {
+if (listClassOrName instanceof String) {
+listClass = Utils.loadClass((String) listClassOrName, 
Object.class);
+} else if (listClassOrName instanceof Class) {
+listClass = (Class) listClassOrName;
+} else {
+throw new KafkaException("Could not determine the list class 
instance using \"" + listTypePropertyName + "\" property.");
+}
+} catch (final ClassNotFoundException e) {
+throw new ConfigException(listTypePropertyName, listClassOrName, 
"Deserializer's list class \"" + listClassOrName + "\" could not be found.");
+}
+}
+
+@SuppressWarnings("unchecked")
+private void configureInnerSerde(Map configs, boolean isKey) {
+String innerSerdePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+final Object innerSerdeClassOrName = 
configs.get(innerSerdePropertyName);
+if (innerSerdeClassOrName == null) {
+throw new 

[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2021-05-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343505#comment-17343505
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12774:


Thanks for the report – regarding how to reproduce this, you should be able to 
inject a failure by throwing some RuntimeException in an operator, for example 
{code:java}
builder.stream("source-topic").selectKey((k, v) -> {throw new 
RuntimeException();});
{code}
If you're able to put together a minimal example to reproduce this using the 
above, that would help us to investigate this. Also, just to be clear, what 
_should_ be happening is to print this stacktrace all on one line, rather than 
line-by-line, is that right?

One more thing: can you try using the old (now deprecated) uncaught exception 
handler on version 2.8, to help isolate the problem?

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> 
> 
>  stacktraceAsString="true" properties="true">
>  value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/>
> 
> 
>  {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
> logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
> 
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


ableegoldman commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r631334877



##
File path: 
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
##
@@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() {
 }
 }
 
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnEmptyCollection() {
+List testData = Arrays.asList();
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get empty collection after serialization and 
deserialization on an empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnNull() {
+List testData = null;
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get null after serialization and deserialization on an 
empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of integer primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of short primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(15, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 15 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of float primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Long());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of long primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 

[GitHub] [kafka] ableegoldman commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


ableegoldman commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r631328727



##
File path: docs/streams/upgrade-guide.html
##
@@ -1104,6 +1104,12 @@ 
  JoinWindows has no default size anymore: 
JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) 
 
 
+ New serde type ListSerde: 
+
+ added class ListSerde to (de)serialize 
List-based objects 
+ introduced ListSerializer and 
ListDeserializer to power the new functionality 
+
+

Review comment:
   Can you move this up to the top of this file, under the `Streams changes 
in 3.0` section? It's in reverse order, so the newest stuff goes at the top.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


ableegoldman commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r631327191



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer implements Deserializer> {
+
+private static final Map>, Integer> 
FIXED_LENGTH_DESERIALIZERS = mkMap(
+mkEntry(ShortDeserializer.class, Short.BYTES),
+mkEntry(IntegerDeserializer.class, Integer.BYTES),
+mkEntry(FloatDeserializer.class, Float.BYTES),
+mkEntry(LongDeserializer.class, Long.BYTES),
+mkEntry(DoubleDeserializer.class, Double.BYTES),
+mkEntry(UUIDDeserializer.class, 36)
+);
+
+private Deserializer inner;
+private Class listClass;
+private Integer primitiveSize;
+
+public ListDeserializer() {}
+
+public > ListDeserializer(Class listClass, 
Deserializer inner) {
+if (listClass == null || inner == null) {
+throw new IllegalArgumentException("ListDeserializer requires both 
\"listClass\" and \"innerDeserializer\" parameters to be provided during 
initialization");
+}
+this.listClass = listClass;
+this.inner = inner;
+this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass());
+}
+
+public Deserializer getInnerDeserializer() {
+return inner;
+}
+
+@Override
+public void configure(Map configs, boolean isKey) {
+if (listClass != null || inner != null) {
+throw new ConfigException("List deserializer was already 
initialized using a non-default constructor");
+}
+configureListClass(configs, isKey);
+configureInnerSerde(configs, isKey);
+}
+
+private void configureListClass(Map configs, boolean isKey) {
+String listTypePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS;
+final Object listClassOrName = configs.get(listTypePropertyName);
+if (listClassOrName == null) {
+throw new ConfigException("Not able to determine the list class 
because it was neither passed via the constructor nor set in the config.");
+}
+try {
+if (listClassOrName instanceof String) {
+listClass = Utils.loadClass((String) listClassOrName, 
Object.class);
+} else if (listClassOrName instanceof Class) {
+listClass = (Class) listClassOrName;
+} else {
+throw new KafkaException("Could not determine the list class 
instance using \"" + listTypePropertyName + "\" property.");
+}
+} catch (final ClassNotFoundException e) {
+throw new ConfigException(listTypePropertyName, listClassOrName, 
"Deserializer's list class \"" + listClassOrName + "\" could not be found.");
+}
+}
+
+@SuppressWarnings("unchecked")
+private void configureInnerSerde(Map configs, boolean isKey) {
+String innerSerdePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+final Object innerSerdeClassOrName = 
configs.get(innerSerdePropertyName);
+if (innerSerdeClassOrName == null) {
+throw 

[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


yeralin commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-840015831


   Ok, updated the KIP with serializing nulls for different strategies.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Habermann resolved KAFKA-12775.

Resolution: Invalid

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343430#comment-17343430
 ] 

Nico Habermann commented on KAFKA-12775:


Closing the ticket since KStreams was not meant to be used in such a manner - 
Sorry for the misunderstanding!

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr opened a new pull request #10679: KAFKA-12697: Add Global Topic and Partition count metrics to the Quorum Controller

2021-05-12 Thread GitBox


dielhennr opened a new pull request #10679:
URL: https://github.com/apache/kafka/pull/10679


   Added GlobalTopicCount and GlobalPartitionCount metrics to the 
QuorumControllerMetrics.
   
   The metrics are calculated by counting records as they are replayed e.g. 
replay(TopicRecord), replay(RemoveTopicRecord)
   
   This was unit tested using MockControllerMetrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-12 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343401#comment-17343401
 ] 

Travis Bischel commented on KAFKA-12671:


Hi [~Peyer],

I believe that the Java producer has taken the same approach that I ultimately 
ended up on, which is to disallow an EndTxn request if there are any unresolved 
records. By forcing all records to be in a known-to-the-client stable state, 
and ensuring that no Produce request is in flight, this issue is largely 
avoided.

However, that does not mean the issue does not exist, and a faulty client or 
potentially some unforeseen bugs, or worse a malicious client, can easily 
trigger this: if you manually issue an InitProducerID request and then a 
ProduceRequest with the producer id, this issue happens. Partitions will then 
have a stuck LastStableOffset.

Again, this is because a Produce request begins a transaction in the 
ProduceStateManager, yet no transaction was begun in the TxnCoordinator, and 
nothing will ever end the transaction that was begun in the ProduceStateManager.

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Major
>  Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 

[GitHub] [kafka] gharris1727 commented on pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-05-12 Thread GitBox


gharris1727 commented on pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#issuecomment-839961263


   Hi @kkonstantine @rhauch I've updated the comments and cleaned up some 
names, and applied your suggested changes. PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-12 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340576#comment-17340576
 ] 

Travis Bischel edited comment on KAFKA-12671 at 5/12/21, 5:25 PM:
--

After much investigating, a potential fix is to issue an AddPartitionsToTxn 
request with the producer ID, epoch, transactional ID that caused partitions to 
get stuck, as well as _all_ topics/partitions that are stuck, and then follow 
that with an EndTxn request (w/ pid, producer epoch, txn id).

Finding the pid, producer epoch, and transactional ID is the hard part.

My process to find those three was as follows:

- kcl consume --read-uncommitted -p  -o  
-num 3 -f '%x %y %|\n'
- kcl consume __transaction_state | grep 'ProducerID.*\b\b' -A 7 -B 4



was (Author: twmb):
After much investigating, a potential fix is to issue an AddPartitionsToTxn 
request with the producer ID, epoch, transactional ID that caused partitions to 
get stuck, as well as _all_ topics/partitions that are stuck, and then follow 
that with an EndTxn request (w/ pid, producer epoch, txn id).

Finding the pid, producer epoch, and transactional ID is the hard part.

My process to find those three was as follows:

- kcl consume --read-uncommitted -p  -o  
-num 3 -f '%x %y %|\n'
- kcl consume __transaction_state | grep 'ProducerID.*\b\b -A 7 -B 4


> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Major
>  Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second 

[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-12 Thread Travis Bischel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Bischel updated KAFKA-12671:
---
Priority: Major  (was: Blocker)

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Major
>  Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: 
> [DEBUG] read FindCoordinator v3; err: 
> [DEBUG] wrote InitProducerID v4; err: 
> [DEBUG] read InitProducerID v4; err: 
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: 
> [DEBUG] read AddPartitionsToTxn v2; err: 
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] wrote EndTxn v2; err: 
> [DEBUG] read EndTxn v2; err: 
> [DEBUG] read from broker errored, killing connection; addr: localhost:9092, 
> id: 1, successful_reads: 1, err: context canceled
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 

[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-839951560


   Probably they are... I will try to isolate issue(s) and compare it with 
Gradle 7 release notes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343340#comment-17343340
 ] 

Nico Habermann commented on KAFKA-12775:


[~cadonna] I see, thanks for the quick answer. To be honest that is a bit 
disappointing -- being able to roll out certain changes "online" without doing 
a reset was quite useful in our production environment, once you had a certain 
familiarity with KStreams. I actually thought it was built in that way on 
purpose (hence a bug ticket)



> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343328#comment-17343328
 ] 

Bruno Cadonna commented on KAFKA-12775:
---

[~nhab] Thank you for filing this ticket and the minimal example.

For topology changes, we usually recommend to reset the Streams application. 
There are for sure cases that do not lead to issues without a reset, but in 
general Kafka Streams has never guaranteed backward compatibility when the 
topology changes. There are some rough ideas how to approach this but nothing 
is implemented.

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r631158775



##
File path: 
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
##
@@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() {
 }
 }
 
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnEmptyCollection() {
+List testData = Arrays.asList();
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get empty collection after serialization and 
deserialization on an empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnNull() {
+List testData = null;
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get null after serialization and deserialization on an 
empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of integer primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of short primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(15, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 15 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of float primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Long());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of long primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 

[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r631158775



##
File path: 
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
##
@@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() {
 }
 }
 
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnEmptyCollection() {
+List testData = Arrays.asList();
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get empty collection after serialization and 
deserialization on an empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnNull() {
+List testData = null;
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get null after serialization and deserialization on an 
empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of integer primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of short primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(15, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 15 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of float primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Long());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of long primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 

[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r631158775



##
File path: 
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
##
@@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() {
 }
 }
 
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnEmptyCollection() {
+List testData = Arrays.asList();
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get empty collection after serialization and 
deserialization on an empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnNull() {
+List testData = null;
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get null after serialization and deserialization on an 
empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of integer primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of short primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(15, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 15 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of float primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Long());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of long primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 

[GitHub] [kafka] cmccabe commented on pull request #10572: KAFKA-12697: Add metrics to Quorum Controller

2021-05-12 Thread GitBox


cmccabe commented on pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#issuecomment-839872278


   Can you create a PR that has just the global topics count and global 
partitions count?  Then we can keep the remaining metrics in this PR.  That 
will make it easier to review and get in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add metrics to Quorum Controller

2021-05-12 Thread GitBox


cmccabe commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r631152959



##
File path: metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
##
@@ -326,4 +326,14 @@ PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int 
brokerId) {
 boolean hasLeaderships(int brokerId) {
 return iterator(brokerId, true).hasNext();
 }
+
+int offlinePartitionCount() {

Review comment:
   We cannot count through all the offline partitions this way every time 
metrics get fetched.  As much as possible, there should be no 
`O(num_partitions)` operations in metrics collection.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-12 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r631151867



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer implements Deserializer> {
+
+private static final Map>, Integer> 
FIXED_LENGTH_DESERIALIZERS = mkMap(
+mkEntry(ShortDeserializer.class, Short.BYTES),
+mkEntry(IntegerDeserializer.class, Integer.BYTES),
+mkEntry(FloatDeserializer.class, Float.BYTES),
+mkEntry(LongDeserializer.class, Long.BYTES),
+mkEntry(DoubleDeserializer.class, Double.BYTES),
+mkEntry(UUIDDeserializer.class, 36)
+);
+
+private Deserializer inner;
+private Class listClass;
+private Integer primitiveSize;
+
+public ListDeserializer() {}
+
+public > ListDeserializer(Class listClass, 
Deserializer inner) {
+if (listClass == null || inner == null) {
+throw new IllegalArgumentException("ListDeserializer requires both 
\"listClass\" and \"innerDeserializer\" parameters to be provided during 
initialization");
+}
+this.listClass = listClass;
+this.inner = inner;
+this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass());
+}
+
+public Deserializer getInnerDeserializer() {
+return inner;
+}
+
+@Override
+public void configure(Map configs, boolean isKey) {
+if (listClass != null || inner != null) {
+throw new ConfigException("List deserializer was already 
initialized using a non-default constructor");
+}
+configureListClass(configs, isKey);
+configureInnerSerde(configs, isKey);
+}
+
+private void configureListClass(Map configs, boolean isKey) {
+String listTypePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS;
+final Object listClassOrName = configs.get(listTypePropertyName);
+if (listClassOrName == null) {
+throw new ConfigException("Not able to determine the list class 
because it was neither passed via the constructor nor set in the config.");
+}
+try {
+if (listClassOrName instanceof String) {
+listClass = Utils.loadClass((String) listClassOrName, 
Object.class);
+} else if (listClassOrName instanceof Class) {
+listClass = (Class) listClassOrName;
+} else {
+throw new KafkaException("Could not determine the list class 
instance using \"" + listTypePropertyName + "\" property.");
+}
+} catch (final ClassNotFoundException e) {
+throw new ConfigException(listTypePropertyName, listClassOrName, 
"Deserializer's list class \"" + listClassOrName + "\" could not be found.");
+}
+}
+
+@SuppressWarnings("unchecked")
+private void configureInnerSerde(Map configs, boolean isKey) {
+String innerSerdePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+final Object innerSerdeClassOrName = 
configs.get(innerSerdePropertyName);
+if (innerSerdeClassOrName == null) {
+throw new 

[jira] [Issue Comment Deleted] (KAFKA-12668) MockScheduler is not safe to use in concurrent code.

2021-05-12 Thread Maksim Iakunin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maksim Iakunin updated KAFKA-12668:
---
Comment: was deleted

(was: [~jagsancio], hello! 

Could you give me some test-case when a deadlock occurs? 

Cannot reproduce the bug by myself.)

> MockScheduler is not safe to use in concurrent code.
> 
>
> Key: KAFKA-12668
> URL: https://issues.apache.org/jira/browse/KAFKA-12668
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Jose Armando Garcia Sancio
>Assignee: Maksim Iakunin
>Priority: Major
>  Labels: newbie
>
> The current implementation of {{MockScheduler}} executes tasks in the same 
> stack when {{schedule}} is called. This violates {{Log}}'s assumption since 
> {{Log}} calls {{schedule}} while holding a lock. This can cause deadlock in 
> tests.
> One solution is to change {{MockSchedule}} {{schedule}} method so that 
> {{tick}} is not called. {{tick}} should be called by a stack (thread) that 
> doesn't hold any locks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-12 Thread GitBox


ijuma commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-839854774


   I don't think the issues are related to the shadow plugin. They're related 
to the gradle upgrade.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10654: MINOR: Update jacoco to 0.8.7 for JDK 16 support

2021-05-12 Thread GitBox


ijuma merged pull request #10654:
URL: https://github.com/apache/kafka/pull/10654


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10670) repo.maven.apache.org: Name or service not known

2021-05-12 Thread Lutz Weischer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343267#comment-17343267
 ] 

Lutz Weischer commented on KAFKA-10670:
---

v2.7.1 and current master work fine now. Thanks!  

> repo.maven.apache.org: Name or service not known
> 
>
> Key: KAFKA-10670
> URL: https://issues.apache.org/jira/browse/KAFKA-10670
> Project: Kafka
>  Issue Type: Bug
>  Components: build
> Environment: Fedora 33, Aarch64 
>Reporter: Lutz Weischer
>Priority: Minor
>
> ./gradlew jar 
> fails: 
> > Configure project :
> Building project 'core' with Scala version 2.13.3
> Building project 'streams-scala' with Scala version 2.13.3
> > Task :clients:processMessages
> MessageGenerator: processed 121 Kafka message JSON files(s).
> > Task :clients:compileJava FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':clients:compileJava'.
> > Could not resolve all files for configuration ':clients:compileClasspath'.
>> Could not resolve org.xerial.snappy:snappy-java:1.1.7.7.
>  Required by:
>  project :clients
>   > Could not resolve org.xerial.snappy:snappy-java:1.1.7.7.
>  > Could not get resource 
> 'https://repo.maven.apache.org/maven2/org/xerial/snappy/snappy-java/1.1.7.7/snappy-java-1.1.7.7.pom'.
> > Could not HEAD 
> 'https://repo.maven.apache.org/maven2/org/xerial/snappy/snappy-java/1.1.7.7/snappy-java-1.1.7.7.pom'.
>> repo.maven.apache.org: Name or service not known
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> Deprecated Gradle features were used in this build, making it incompatible 
> with Gradle 7.0.
> Use '--warning-mode all' to show the individual deprecation warnings.
> See 
> https://docs.gradle.org/6.7/userguide/command_line_interface.html#sec:command_line_warnings
> BUILD FAILED in 21s
> 4 actionable tasks: 4 executed



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-05-12 Thread Simon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343255#comment-17343255
 ] 

Simon commented on KAFKA-12671:
---

[~twmb] can you provide a producer code (java?), which will trigger this error?
I would be quite interested to reproduce this problem with a java client. Using 
your DEBUG log I was not yet able to reproduce it.

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> --
>
> Key: KAFKA-12671
> URL: https://issues.apache.org/jira/browse/KAFKA-12671
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Travis Bischel
>Priority: Blocker
>  Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: 
> [DEBUG] read FindCoordinator v3; err: 
> [DEBUG] wrote InitProducerID v4; err: 
> [DEBUG] read InitProducerID v4; err: 
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: 
> [DEBUG] read AddPartitionsToTxn v2; err: 
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] read Produce v8; err: 
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: 
> [DEBUG] wrote EndTxn v2; err: 
> [DEBUG] read EndTxn v2; err: 
> [DEBUG] read from broker 

[jira] [Updated] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Habermann updated KAFKA-12775:
---
Description: 
KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor tries 
to look up the lag for a TaskId that seemingly does not 
exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].

 

I believe this is a functional regression.

Before, it was possible for Streams users to make backwards-compatible topology 
changes and roll those out, without having to do a complete restore or reload.

For example:

Existing sample topology:
 
{code:java}
stream1 = stream(topic)

stream1
  .map(...)
  .to(output){code}

And doing this backwards-compatible change:

{code:java}
stream1 = stream(topic)
++ table = stream(topic2).through(repartition-topic)/repartition().toTable()

stream1
  .map(...)
++  .join(table)
  .to(output){code}
 

This effectively creates a new subtopology with a new task for the table 
repartition.

In older KStreams versions, it would have been possible to simply roll this 
change out.
Since 2.6, rolling this out will crash the stream because the linked exception 
gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  tries to look 
up the lag for the new table-repartition-task

 

At this time, the only possible way to avoid this exception seems to be 
deleting all local state and doing a complete restore with the new topology 
change included.

 

  was:
KAFKA-6145 added an [exception if the Partition Assignor tries to look up the 
lag for a TaskId that seemingly does not 
exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].

 

I believe this is a functional regression.

Before, it was possible for Streams users to make backwards-compatible topology 
changes and roll those out, without having to do a complete restore or reload.

For example:

Existing sample topology:
 
{code:java}
stream1 = stream(topic)

stream1
  .map(...)
  .to(output){code}

And doing this backwards-compatible change:

{code:java}
stream1 = stream(topic)
++ table = stream(topic2).through(repartition-topic)/repartition().toTable()

stream1
  .map(...)
++  .join(table)
  .to(output){code}
 

This effectively creates a new subtopology with a new task for the table 
repartition.

In older KStreams versions, it would have been possible to simply roll this 
change out.
Since 2.6, rolling this out will crash the stream because the linked exception 
gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  tries to look 
up the lag for the new table-repartition-task

 

At this time, the only possible way to avoid this exception seems to be 
deleting all local state and doing a complete restore with the new topology 
change included.

 


> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the 

[jira] [Updated] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Habermann updated KAFKA-12775:
---
Description: 
KAFKA-6145 added an [exception if the Partition Assignor tries to look up the 
lag for a TaskId that seemingly does not 
exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].

 

I believe this is a functional regression.

Before, it was possible for Streams users to make backwards-compatible topology 
changes and roll those out, without having to do a complete restore or reload.

For example:

Existing sample topology:
 
{code:java}
stream1 = stream(topic)

stream1
  .map(...)
  .to(output){code}

And doing this backwards-compatible change:

{code:java}
stream1 = stream(topic)
++ table = stream(topic2).through(repartition-topic)/repartition().toTable()

stream1
  .map(...)
++  .join(table)
  .to(output){code}
 

This effectively creates a new subtopology with a new task for the table 
repartition.

In older KStreams versions, it would have been possible to simply roll this 
change out.
Since 2.6, rolling this out will crash the stream because the linked exception 
gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  tries to look 
up the lag for the new table-repartition-task

 

At this time, the only possible way to avoid this exception seems to be 
deleting all local state and doing a complete restore with the new topology 
change included.

 

  was:
KAFKA-6145 added an [exception if the new High Availability Assignor tries to 
look up the lag for a TaskId that seemingly does not 
exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].

 

I believe this is a functional regression.

Before, it was possible for Streams users to make backwards-compatible topology 
changes and roll those out, without having to do a complete restore or reload.

For example:

Existing sample topology:
 
{code:java}
stream1 = stream(topic)

stream1
  .map(...)
  .to(output){code}

And doing this backwards-compatible change:

{code:java}
stream1 = stream(topic)
++ table = stream(topic2).through(repartition-topic)/repartition().toTable()

stream1
  .map(...)
++  .join(table)
  .to(output){code}
 

This effectively creates a new subtopology with a new task for the table 
repartition.

In older KStreams versions, it would have been possible to simply roll this 
change out.
Since 2.6, rolling this out will crash the stream because the linked exception 
gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  tries to look 
up the lag for the new table-repartition-task

 

At this time, the only possible way to avoid this exception seems to be 
deleting all local state and doing a complete restore with the new topology 
change included.

 


> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 added an [exception if the Partition Assignor tries to look up the 
> lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> 

[jira] [Updated] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Habermann updated KAFKA-12775:
---
Component/s: streams

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> -
>
> Key: KAFKA-12775
> URL: https://issues.apache.org/jira/browse/KAFKA-12775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nico Habermann
>Priority: Major
>
> KAFKA-6145 added an [exception if the new High Availability Assignor tries to 
> look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12775) StreamsPartitionAssignor / ClientState throws an exception when a new Task gets added to a KStreams Application in a Backwards-Compatible Topology Change

2021-05-12 Thread Nico Habermann (Jira)
Nico Habermann created KAFKA-12775:
--

 Summary: StreamsPartitionAssignor / ClientState throws an 
exception when a new Task gets added to a KStreams Application in a 
Backwards-Compatible Topology Change
 Key: KAFKA-12775
 URL: https://issues.apache.org/jira/browse/KAFKA-12775
 Project: Kafka
  Issue Type: Bug
Reporter: Nico Habermann


KAFKA-6145 added an [exception if the new High Availability Assignor tries to 
look up the lag for a TaskId that seemingly does not 
exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].

 

I believe this is a functional regression.

Before, it was possible for Streams users to make backwards-compatible topology 
changes and roll those out, without having to do a complete restore or reload.

For example:

Existing sample topology:
 
{code:java}
stream1 = stream(topic)

stream1
  .map(...)
  .to(output){code}

And doing this backwards-compatible change:

{code:java}
stream1 = stream(topic)
++ table = stream(topic2).through(repartition-topic)/repartition().toTable()

stream1
  .map(...)
++  .join(table)
  .to(output){code}
 

This effectively creates a new subtopology with a new task for the table 
repartition.

In older KStreams versions, it would have been possible to simply roll this 
change out.
Since 2.6, rolling this out will crash the stream because the linked exception 
gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  tries to look 
up the lag for the new table-repartition-task

 

At this time, the only possible way to avoid this exception seems to be 
deleting all local state and doing a complete restore with the new topology 
change included.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12534) kafka-configs does not work with ssl enabled kafka broker.

2021-05-12 Thread Jordan Moore (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343202#comment-17343202
 ] 

Jordan Moore commented on KAFKA-12534:
--

According to this documentation, the listener name (in lowercase) "must" be 
used to set the ssl properties

So, for listeners=SSL://:9093

{code} 
kafka-configs --command-config /etc/kafka/client.properties --bootstrap-server 
hostname:port --entity-type brokers --entity-name  --alter 
--add-config listener.name.ssl.ssl.keystore.location=
{code}

https://docs.confluent.io/platform/current/kafka/dynamic-config.html#updating-ssl-keystore-of-an-existing-listener

> kafka-configs does not work with ssl enabled kafka broker.
> --
>
> Key: KAFKA-12534
> URL: https://issues.apache.org/jira/browse/KAFKA-12534
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.1
>Reporter: kaushik srinivas
>Priority: Critical
>
> We are trying to change the trust store password on the fly using the 
> kafka-configs script for a ssl enabled kafka broker.
> Below is the command used:
> kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
> --entity-name 1001 --alter --add-config 'ssl.truststore.password=xxx'
> But we see below error in the broker logs when the command is run.
> {"type":"log", "host":"kf-2-0", "level":"INFO", 
> "neid":"kafka-cfd5ccf2af7f47868e83473408", "system":"kafka", 
> "time":"2021-03-23T12:14:40.055", "timezone":"UTC", 
> "log":\{"message":"data-plane-kafka-network-thread-1002-ListenerName(SSL)-SSL-2
>  - org.apache.kafka.common.network.Selector - [SocketServer brokerId=1002] 
> Failed authentication with /127.0.0.1 (SSL handshake failed)"}}
>  How can anyone configure ssl certs for the kafka-configs script and succeed 
> with the ssl handshake in this case ? 
> Note : 
> We are trying with a single listener i.e SSL: 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12534) kafka-configs does not work with ssl enabled kafka broker.

2021-05-12 Thread Jordan Moore (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343196#comment-17343196
 ] 

Jordan Moore commented on KAFKA-12534:
--

It would appear my original comment has solved your error and the title of this 
issue (you no longer get SSL handshake errors)

I'm not sure about the remaining issues, however the error you posted does not 
include /etc/kafka/secrets in your properties, nor refers to a JKS file in the 
error message, so it's difficult to reproduce your error if you obfuscate 
filenames differently in the logs and properties 

> kafka-configs does not work with ssl enabled kafka broker.
> --
>
> Key: KAFKA-12534
> URL: https://issues.apache.org/jira/browse/KAFKA-12534
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.1
>Reporter: kaushik srinivas
>Priority: Critical
>
> We are trying to change the trust store password on the fly using the 
> kafka-configs script for a ssl enabled kafka broker.
> Below is the command used:
> kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
> --entity-name 1001 --alter --add-config 'ssl.truststore.password=xxx'
> But we see below error in the broker logs when the command is run.
> {"type":"log", "host":"kf-2-0", "level":"INFO", 
> "neid":"kafka-cfd5ccf2af7f47868e83473408", "system":"kafka", 
> "time":"2021-03-23T12:14:40.055", "timezone":"UTC", 
> "log":\{"message":"data-plane-kafka-network-thread-1002-ListenerName(SSL)-SSL-2
>  - org.apache.kafka.common.network.Selector - [SocketServer brokerId=1002] 
> Failed authentication with /127.0.0.1 (SSL handshake failed)"}}
>  How can anyone configure ssl certs for the kafka-configs script and succeed 
> with the ssl handshake in this case ? 
> Note : 
> We are trying with a single listener i.e SSL: 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12668) MockScheduler is not safe to use in concurrent code.

2021-05-12 Thread Maksim Iakunin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343191#comment-17343191
 ] 

Maksim Iakunin commented on KAFKA-12668:


[~jagsancio], hello! 

Could you give me some test-case when a deadlock occurs? 

Cannot reproduce the bug by myself.

> MockScheduler is not safe to use in concurrent code.
> 
>
> Key: KAFKA-12668
> URL: https://issues.apache.org/jira/browse/KAFKA-12668
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Jose Armando Garcia Sancio
>Assignee: Maksim Iakunin
>Priority: Major
>  Labels: newbie
>
> The current implementation of {{MockScheduler}} executes tasks in the same 
> stack when {{schedule}} is called. This violates {{Log}}'s assumption since 
> {{Log}} calls {{schedule}} while holding a lock. This can cause deadlock in 
> tests.
> One solution is to change {{MockSchedule}} {{schedule}} method so that 
> {{tick}} is not called. {{tick}} should be called by a stack (thread) that 
> doesn't hold any locks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2021-05-12 Thread Jira
Jørgen created KAFKA-12774:
--

 Summary: kafka-streams 2.8: logging in uncaught-exceptionhandler 
doesn't go through log4j
 Key: KAFKA-12774
 URL: https://issues.apache.org/jira/browse/KAFKA-12774
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: Jørgen


When exceptions is handled in the uncaught-exception handler introduced in 
KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
framework configured by the application (log4j2 in our case), but gets printed 
to console "line-by-line".

All other exceptions logged by kafka-streams go through log4j2 and gets 
formatted properly according to the log4j2 appender (json in our case). Haven't 
tested this on other frameworks like logback.

Application setup:
 * Spring-boot 2.4.5
 * Log4j 2.13.3
 * Slf4j 1.7.30

Log4j2 appender config:
{code:java}






 {code}
Uncaught exception handler config:
{code:java}
kafkaStreams.setUncaughtExceptionHandler { exception ->
logger.warn("Uncaught exception handled - replacing thread", exception) // 
logged properly
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
} {code}
Stacktrace that gets printed line-by-line:
{code:java}
Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic xxx-repartition for task 3_2 due 
to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.Exception handler choose to FAIL the processing, no more 
records would be sent.at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
   at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
 at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
  at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
  at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
 at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)  at 
java.base/java.lang.Thread.run(Unknown Source)Caused by: 
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id. {code}
 

It's a little bit hard to reproduce as I haven't found any way to trigger 
uncaught-exception-handler through junit-tests.

Link to discussion on slack: 
https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12534) kafka-configs does not work with ssl enabled kafka broker.

2021-05-12 Thread kaushik srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343135#comment-17343135
 ] 

kaushik srinivas commented on KAFKA-12534:
--

Hi [~cricket007]

We have tried this out. File permissions are ok and the double slash also is 
corrected. Still we face the same.

what do you suggest.

> kafka-configs does not work with ssl enabled kafka broker.
> --
>
> Key: KAFKA-12534
> URL: https://issues.apache.org/jira/browse/KAFKA-12534
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.1
>Reporter: kaushik srinivas
>Priority: Critical
>
> We are trying to change the trust store password on the fly using the 
> kafka-configs script for a ssl enabled kafka broker.
> Below is the command used:
> kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
> --entity-name 1001 --alter --add-config 'ssl.truststore.password=xxx'
> But we see below error in the broker logs when the command is run.
> {"type":"log", "host":"kf-2-0", "level":"INFO", 
> "neid":"kafka-cfd5ccf2af7f47868e83473408", "system":"kafka", 
> "time":"2021-03-23T12:14:40.055", "timezone":"UTC", 
> "log":\{"message":"data-plane-kafka-network-thread-1002-ListenerName(SSL)-SSL-2
>  - org.apache.kafka.common.network.Selector - [SocketServer brokerId=1002] 
> Failed authentication with /127.0.0.1 (SSL handshake failed)"}}
>  How can anyone configure ssl certs for the kafka-configs script and succeed 
> with the ssl handshake in this case ? 
> Note : 
> We are trying with a single listener i.e SSL: 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr opened a new pull request #10678: TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc

2021-05-12 Thread GitBox


dongjinleekr opened a new pull request #10678:
URL: https://github.com/apache/kafka/pull/10678


   I found these glitches while working on 
[KAFKA-12768](https://issues.apache.org/jira/browse/KAFKA-12768), etc.
   
   Here are some explanations on the last commit:
   
   As of present, `ConsoleConsumer` is taking timeout ms parameter as 
`Integer`. (see `ConsumerConfig#timeoutMsOpt`) For this reason, 
`ConsumerConfig#timeoutMs` is `Integer` and in turn, `timeoutMs` variable in 
`ConsoleConsumer#run` becomes `Any` - since it can either of `Integer` or 
`Long`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT opened a new pull request #10677: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is

2021-05-12 Thread GitBox


DuongPTIT opened a new pull request #10677:
URL: https://github.com/apache/kafka/pull/10677


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…

2021-05-12 Thread GitBox


chia7712 commented on a change in pull request #10673:
URL: https://github.com/apache/kafka/pull/10673#discussion_r630800233



##
File path: tests/kafkatest/services/streams.py
##
@@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode):
 
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
 processingMode)
 
+def prop_file(self):
+properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  # the old broker (< 2.4) does not support configuration 
replication.factor=-1
+  "replication.factor": 1}

Review comment:
   > Should we set it to 3 instead? IIRC, we run all system tests with 3 
brokers?
   
   not really. `streams_broker_compatibility_test.py` run test with single 
broker ( 
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py#L45)
   
   > Might be better to just set it in the Java code instead as it's easier to 
find and read, and I believe most other configs are set there. 
   
   I prefer to change python code rather than java code since the number of 
brokers is connected to replication refactor. If we add hardcode (i.e 
`replication refactor = 1`) in the java class, it is hard to change both of 
them in python.
   
   >  I think this test runs the StreamsSmokeTest?
   
   BrokerCompatibilityTest 
(https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/streams.py#L466)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10671: MINOR: exclude all `src/generated` and `src/generated-test`

2021-05-12 Thread GitBox


chia7712 merged pull request #10671:
URL: https://github.com/apache/kafka/pull/10671


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10671: MINOR: exclude all `src/generated` and `src/generated-test`

2021-05-12 Thread GitBox


chia7712 commented on pull request #10671:
URL: https://github.com/apache/kafka/pull/10671#issuecomment-839530020


   > Thanks @chia7712 . Can you verify this by creating some test 
directories/file that follow this pattern, and make sure they don't show up? I 
had a vague impression that .gitignore files don't allow regular regexes and 
followed some niche syntax instead, but I'm not sure.
   
   the gitignore documentation (https://git-scm.com/docs/gitignore) has 
following example:
   `
   A leading "**" followed by a slash means match in all directories. For 
example, "**/foo" matches file or directory "foo" anywhere, the same as pattern 
"foo". "**/foo/bar" matches file or directory "bar" anywhere that is directly 
under directory "foo".
   `
   
   I add file to `src/generated` and `aaa/src/generated` and they are excluded.
   
   https://user-images.githubusercontent.com/6234750/117934892-b7030880-b335-11eb-9e4c-a25aec6f399c.png;>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2021-05-12 Thread Ewen Cheslack-Postava (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343043#comment-17343043
 ] 

Ewen Cheslack-Postava commented on KAFKA-3539:
--

[~mjsax] They aren't really duplicates. It's possible to also just have races 
between whatever creates a topic and a process that tries to produce to it. I 
think 3450 implicitly assumes a simple sequential model where create should 
complete, and only then processes are deployed. In practice, there are lots of 
convergence based systems that can do without this with the right preflight 
checks (topic creation may happen in parallel to deploying an app that uses it, 
but does not have permissions to create topics). They may be related, but at 
least the high level description of 3450 wrt topic auto creation doesn't quite 
align with this one.

 

[~moses.nakamura] this is a very tricky thing to get right. At first glance we 
should make anything blocking (or with simple timeout) just be tied to the 
producer constructor, but at that point we don't know what info to get. Adding 
a queue can help, but adds overhead and really just delays the issue. 
Configurable timeouts allow you to set your tolerance for blocking operations 
(you're going to spend at least some time in the send() call anyway). 
Alternatively, we could not wait on metadata at all and only check if it's 
available, request it if it is not, and then bail with a RetriableException.

Pretty much any solution proposed so far is going to need a KIP (it's at a 
minimum a behavioral change, for your option 1 it's an API addition). A good 
starting point if you want to address this is to enumerate the variety of 
options and their pros/cons/compatibility issues.

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Oleg Zhurakousky
>Priority: Critical
>  Labels: needs-discussion, needs-kip
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] holmofy commented on pull request #161: KAFKA-1543: Changing replication factor

2021-05-12 Thread GitBox


holmofy commented on pull request #161:
URL: https://github.com/apache/kafka/pull/161#issuecomment-839495865


   Excuse me, is this feature supported now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-12 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343028#comment-17343028
 ] 

Ning Zhang commented on KAFKA-12635:


backport is a great point. I guess the review cycle will probably take 3-4 
weeks at most, so I will let the committer/reviewer know the backport option 
and see what is available from their point of view.

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)