[GitHub] [kafka] dajac commented on pull request #7265: Don't merge this, just testing something in the build

2020-10-23 Thread GitBox


dajac commented on pull request #7265:
URL: https://github.com/apache/kafka/pull/7265#issuecomment-715752369


   @mumrah I guess that we could close this one, isn't it?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #7336: KAFKA-8107:Flaky Test kafka.api.ClientIdQuotaTest.testQuotaOverrideDe…

2020-10-23 Thread GitBox


dajac closed pull request #7336:
URL: https://github.com/apache/kafka/pull/7336


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #7336: KAFKA-8107:Flaky Test kafka.api.ClientIdQuotaTest.testQuotaOverrideDe…

2020-10-23 Thread GitBox


dajac commented on pull request #7336:
URL: https://github.com/apache/kafka/pull/7336#issuecomment-715749977


   Closing as this has been fixed by https://github.com/apache/kafka/pull/8394.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10639) There should be an EnvironmentConfigProvider that will do variable substitution using environment variable.

2020-10-23 Thread Brad Davis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220011#comment-17220011
 ] 

Brad Davis commented on KAFKA-10639:


I've actually already locally implemented this, but want to contribute it back 
upstream.  The limiting factor is primarily making sure that there are 
appropriate tests around it and ensuring conformance to existing coding 
standards.

> There should be an EnvironmentConfigProvider that will do variable 
> substitution using environment variable.
> ---
>
> Key: KAFKA-10639
> URL: https://issues.apache.org/jira/browse/KAFKA-10639
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 2.5.1
>Reporter: Brad Davis
>Priority: Major
>
> Running Kafka Connect in the same docker container in multiple stages (like 
> dev vs production) means that a file based approach to secret hiding using 
> the file config provider isn't viable.  However, docker container instances 
> can have their environment variables customized on a per-container basis, and 
> our existing tech stack typically exposes per-stage secrets (like the dev DB 
> password vs the prod DB password) through env vars within the containers.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10639) There should be an EnvironmentConfigProvider that will do variable substitution using environment variable.

2020-10-23 Thread Brad Davis (Jira)
Brad Davis created KAFKA-10639:
--

 Summary: There should be an EnvironmentConfigProvider that will do 
variable substitution using environment variable.
 Key: KAFKA-10639
 URL: https://issues.apache.org/jira/browse/KAFKA-10639
 Project: Kafka
  Issue Type: Improvement
  Components: config
Affects Versions: 2.5.1
Reporter: Brad Davis


Running Kafka Connect in the same docker container in multiple stages (like dev 
vs production) means that a file based approach to secret hiding using the file 
config provider isn't viable.  However, docker container instances can have 
their environment variables customized on a per-container basis, and our 
existing tech stack typically exposes per-stage secrets (like the dev DB 
password vs the prod DB password) through env vars within the containers.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-10-23 Thread GitBox


RamanVerma commented on a change in pull request #9364:
URL: https://github.com/apache/kafka/pull/9364#discussion_r511277713



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -298,26 +300,38 @@ class LogManager(logDirs: Seq[File],
   /**
* Recover and load all logs in the given data directories
*/
-  private def loadLogs(): Unit = {
+  private[log] def loadLogs(): Unit = {
 info(s"Loading logs from log dirs $liveLogDirs")
 val startMs = time.hiResClockMs()
 val threadPools = ArrayBuffer.empty[ExecutorService]
 val offlineDirs = mutable.Set.empty[(String, IOException)]
-val jobs = mutable.Map.empty[File, Seq[Future[_]]]
+val jobs = ArrayBuffer.empty[Seq[Future[_]]]
 var numTotalLogs = 0
 
 for (dir <- liveLogDirs) {
   val logDirAbsolutePath = dir.getAbsolutePath
+  var hadCleanShutdown: Boolean = false
   try {
 val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
 threadPools.append(pool)
 
 val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
 if (cleanShutdownFile.exists) {
   info(s"Skipping recovery for all logs in $logDirAbsolutePath since 
clean shutdown file was found")
+  // Cache the clean shutdown status and use that for rest of log 
loading workflow. Delete the CleanShutdownFile
+  // so that if broker crashes while loading the log, it is considered 
hard shutdown during the next boot up. KAFKA-10471
+  try {
+cleanShutdownFile.delete()
+  } catch {
+case e: IOException =>

Review comment:
   `java.nio.file.Files` API throws `IOException` but not the one used here 
`java.io.File`. Will remove this exception





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent commented on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


stan-confluent commented on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715656921


   https://github.com/apache/kafka/pull/9490



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent commented on pull request #9490: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


stan-confluent commented on pull request #9490:
URL: https://github.com/apache/kafka/pull/9490#issuecomment-715656906


   cc @omkreddy @edenhill @sdanduConf @andrewegel 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent opened a new pull request #9490: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


stan-confluent opened a new pull request #9490:
URL: https://github.com/apache/kafka/pull/9490


   Ducktape version 0.7.10 pinned paramiko to version 2.3.2 to deal with random 
SSHExceptions confluent had been seeing since ducktape was updated to a later 
version of paramiko.
   
   The idea is that we can backport ducktape 0.7.10 change as far back as 
possible, while 2.7 and trunk can update to 0.8.0 and python3 separately.
   
   Tested:
   In progress, but unlikely to affect anything, since the only difference 
between ducktape 0.7.9 and 0.7.10 is paramiko version downgrade.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] warrenzhu25 opened a new pull request #9491: KAFKA-10623: Refactor code to avoid discovery conflicts for admin.VersionRange

2020-10-23 Thread GitBox


warrenzhu25 opened a new pull request #9491:
URL: https://github.com/apache/kafka/pull/9491


   Rename admin.VersionRange into admin.Versions to avoid discovery conflicts
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent commented on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


stan-confluent commented on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715656527


   @omkreddy sounds good, let me also re-run the tests on 2.6, just to be sure.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent closed pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


stan-confluent closed pull request #9488:
URL: https://github.com/apache/kafka/pull/9488


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy edited a comment on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


omkreddy edited a comment on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715651991


   @stan-confluent Yes, Please update the PR and base against 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy edited a comment on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


omkreddy edited a comment on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715651991


   @stan-confluent Yes, Please update the PR and re-point to 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


omkreddy commented on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715651991


   @stan-confluent Yes, Pls update the PR and re-point to 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent commented on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


stan-confluent commented on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715651165


   > We have PR #9480 to update the version in setup.py. I plan to commit #9480 
to trunk and 2,.7 branches.
   > 
   > I think, we can update ducktape 0.7.10 to 2.6 and below.
   
   Awesome, thanks Manikumar. Do you want me to re-point this PR to 2.6 branch?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent commented on a change in pull request #9480: KAFKA-10592: Fix vagrant for a system tests with python3

2020-10-23 Thread GitBox


stan-confluent commented on a change in pull request #9480:
URL: https://github.com/apache/kafka/pull/9480#discussion_r511233150



##
File path: tests/setup.py
##
@@ -51,7 +51,7 @@ def run_tests(self):
   license="apache2.0",
   packages=find_packages(),
   include_package_data=True,
-  install_requires=["ducktape==0.7.9", "requests==2.22.0"],
+  install_requires=["ducktape==0.8.0", "requests==2.24.0"],

Review comment:
   Why do we need to pin `requests` explicitly?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy edited a comment on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


omkreddy edited a comment on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715649754


   We have PR https://github.com/apache/kafka/pull/9480 to update the version 
in setup.py. I plan to commit #9480 to trunk and 2,.7 branches.
   
   I think, we can update ducktape 0.7.10 to 2.6 and below.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy edited a comment on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


omkreddy edited a comment on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715649754


   We have a PR https://github.com/apache/kafka/pull/9480 to update the version 
in setup.py. I plan to commit #9480 to trunk and 2,.7 branches.
   
   I think, we can update ducktape 0.7.10 to 2.6 and below.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


omkreddy commented on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715649754


   We have a PR https://github.com/apache/kafka/pull/9480 to update the version 
in setup.py. I plan to commit #9480 to trunk and 2,.7 branches.
   
   I think, we can update duktape 0.7.10 to 2.6 and below.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG

2020-10-23 Thread GitBox


ableegoldman commented on pull request #9489:
URL: https://github.com/apache/kafka/pull/9489#issuecomment-715639072


   @guozhangwang @vvcephei  WDYT? I'm generally all for more logs but this is 
pretty extreme  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG

2020-10-23 Thread GitBox


ableegoldman opened a new pull request #9489:
URL: https://github.com/apache/kafka/pull/9489


   This message absolutely floods the logs, especially in an eos application 
where the commit interval is just 100ms. It's definitely a useful message but I 
don't think there's any justification for it being at the INFO level when it's 
logged 10 times a second. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10545) Create topic IDs and propagate to brokers

2020-10-23 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10545:
---
Description: 
First step for KIP-516

The goals are:
 * Create and store topic IDs in a ZK Node and controller memory.
 * Propagate topic ID to brokers with updated LeaderAndIsrRequest, 
UpdateMetadata
 * Store topic ID in memory on broker, persistent file in log

  was:
First step for KIP-516

The goals are:
 * Create and store topic IDs in a ZK Node and controller memory.
 * Propagate topic ID to brokers with updated LeaderAndIsrRequest
 * Store topic ID in memory on broker, persistent file in log


> Create topic IDs and propagate to brokers
> -
>
> Key: KAFKA-10545
> URL: https://issues.apache.org/jira/browse/KAFKA-10545
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> First step for KIP-516
> The goals are:
>  * Create and store topic IDs in a ZK Node and controller memory.
>  * Propagate topic ID to brokers with updated LeaderAndIsrRequest, 
> UpdateMetadata
>  * Store topic ID in memory on broker, persistent file in log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10547) Add topic IDs to MetadataResponse

2020-10-23 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10547:
---
Summary: Add topic IDs to MetadataResponse  (was: Add topic IDs to 
MetadataResponse, UpdateMetadata)

> Add topic IDs to MetadataResponse
> -
>
> Key: KAFKA-10547
> URL: https://issues.apache.org/jira/browse/KAFKA-10547
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> Prevent reads from deleted topics
> Will be able to use TopicDescription to identify the topic ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stan-confluent commented on pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


stan-confluent commented on pull request #9488:
URL: https://github.com/apache/kafka/pull/9488#issuecomment-715636766


   cc @omkreddy @edenhill - folks, I think you've been working on updating 
kafkatest to python 3, so adding you as reviewers.
   also cc @andrewegel and @sdanduConf



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent opened a new pull request #9488: Pin ducktape to version 0.7.10

2020-10-23 Thread GitBox


stan-confluent opened a new pull request #9488:
URL: https://github.com/apache/kafka/pull/9488


   Ducktape version 0.7.10 pinned paramiko to version 2.3.2 to deal with random 
`SSHException`s confluent had been seeing since ducktape was updated to a later 
version of paramiko.
   This PR pins both the version in `setup.py` and in `ducker-ak`'s 
`Dockerfile` to the same version. Previously ducker version was pinned to 
0.8.0. 
   I'll send a separate PR that pins both versions to 0.8.0 (unless someone is 
already working on that separately) - the idea is that we can backport ducktape 
0.7.10 change as far back as possible, while keep the ducktape 0.8.0 change in 
trunk only (unless we plan to backport python3 changes).
   
   Tested:
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-10-23--001.1603440055--stan-confluent--ducktape-710--3c51234d0/report.html
   
   There are 10 failing tests, 9 of which have been failing before this change, 
and 1 (streams_upgrade_test) I've seen failing on different branches recently 
as well, though it was passing on trunk as of couple of days ago. Since 
ducktape version only changed the paramiko dependencies, I don't think it 
could've affected it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-23 Thread GitBox


hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r511200238



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * TODO: Also flush after minimum size limit is reached?
+ */
+public class BatchAccumulator implements Closeable {
+private final int epoch;
+private final Time time;
+private final Timer lingerTimer;
+private final int lingerMs;
+private final int maxBatchSize;
+private final CompressionType compressionType;
+private final MemoryPool memoryPool;
+private final ReentrantLock lock;
+private final RecordSerde serde;
+
+private long nextOffset;
+private BatchBuilder currentBatch;
+private List> completed;
+
+public BatchAccumulator(
+int epoch,
+long baseOffset,
+int lingerMs,
+int maxBatchSize,
+MemoryPool memoryPool,
+Time time,
+CompressionType compressionType,
+RecordSerde serde
+) {
+this.epoch = epoch;
+this.lingerMs = lingerMs;
+this.maxBatchSize = maxBatchSize;
+this.memoryPool = memoryPool;
+this.time = time;
+this.lingerTimer = time.timer(lingerMs);
+this.compressionType = compressionType;
+this.serde = serde;
+this.nextOffset = baseOffset;
+this.completed = new ArrayList<>();
+this.lock = new ReentrantLock();
+}
+
+/**
+ * Append a list of records into an atomic batch. We guarantee all records
+ * are included in the same underlying record batch so that either all of
+ * the records become committed or none of them do.
+ *
+ * @param epoch the expected leader epoch
+ * @param records the list of records to include in a batch
+ * @return the offset of the last message or {@link Long#MAX_VALUE} if the 
epoch
+ * does not match
+ */
+public Long append(int epoch, List records) {
+if (epoch != this.epoch) {
+// If the epoch does not match, then the state machine probably
+// has not gotten the notification about the latest epoch change.
+// In this case, ignore the append and return a large offset value
+// which will never be committed.

Review comment:
   Right. It is important to ensure that the state machine has observed the 
latest leader epoch. Otherwise there may be committed data inflight which the 
state machine has yet to see.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10633) Constant probing rebalances in Streams 2.6

2020-10-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219963#comment-17219963
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10633:


The issue with directory contents being deleted but not the directories 
themselves sounds like KAFKA-10564 (also fixed in 2.7.0/2.6.1). I don't believe 
that particular bug has any real implications, other than being 
annoying/misleading in the logs – it should still delete everything in the task 
directory, including the checkpoint file which is how the assignor determines 
which persistent state is/isn't on an instance.

I'm kind of surprised that you would still get a rebalance after redeploying 
like that when using static membership. Unless it takes longer than the static 
group membership timeout I guess. To be honest, I'm not that familiar with the 
specifics of static membership in general – maybe [~bchen225242] can chime in 
here. But I suppose I would start by checking out the logs on the Streams side 
after it comes back up, and see if it's logged any reason for triggering a 
rebalance explicitly.

(There are a few reasons to trigger a rebalance after a static member is 
bounced, for example if it's hostname changed for IQ. If anything like that 
happened it should be logged clearly)

> Constant probing rebalances in Streams 2.6
> --
>
> Key: KAFKA-10633
> URL: https://issues.apache.org/jira/browse/KAFKA-10633
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Bradley Peterson
>Priority: Major
> Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 
> 46.409Z.csv
>
>
> We are seeing a few issues with the new rebalancing behavior in Streams 2.6. 
> This ticket is for constant probing rebalances on one StreamThread, but I'll 
> mention the other issues, as they may be related.
> First, when we redeploy the application we see tasks being moved, even though 
> the task assignment was stable before redeploying. We would expect to see 
> tasks assigned back to the same instances and no movement. The application is 
> in EC2, with persistent EBS volumes, and we use static group membership to 
> avoid rebalancing. To redeploy the app we terminate all EC2 instances. The 
> new instances will reattach the EBS volumes and use the same group member id.
> After redeploying, we sometimes see the group leader go into a tight probing 
> rebalance loop. This doesn't happen immediately, it could be several hours 
> later. Because the redeploy caused task movement, we see expected probing 
> rebalances every 10 minutes. But, then one thread will go into a tight loop 
> logging messages like "Triggering the followup rebalance scheduled for 
> 1603323868771 ms.", handling the partition assignment (which doesn't change), 
> then "Requested to schedule probing rebalance for 1603323868771 ms." This 
> repeats several times a second until the app is restarted again. I'll attach 
> a log export from one such incident.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck edited a comment on pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


bbejeck edited a comment on pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#issuecomment-715620710


   Need to test locally that the `uploadArchives` task triggers `archive` 
blocks in the build.gradle file.  If correct then the 
`kafka-streams-scala-scaladoc.jar` should get created properly, and with 
scaladoc disabled for core, we won't publish non-public scaladoc



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


bbejeck commented on pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#issuecomment-715620710


   Need to test locally that the `uploadArchives` task triggers `archive` 
blocks in the build.gradle file.  If correct then the 
`kafka-streams-scala-scaladoc.jar` should get created properly



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10633) Constant probing rebalances in Streams 2.6

2020-10-23 Thread Bradley Peterson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219958#comment-17219958
 ] 

Bradley Peterson commented on KAFKA-10633:
--

Sophie (et al.), do you have any thoughts about what would cause rebalances 
(and task movements) after redeploying? If we could fix that, then this would 
be less of a problem, because we would rarely have probing rebalances. We do 
have another problem where state directories are not deleted, but their 
contents are (almost like KAFKA-6647, but not quite the same). Is it possible 
that is confusing the task assignor?

> Constant probing rebalances in Streams 2.6
> --
>
> Key: KAFKA-10633
> URL: https://issues.apache.org/jira/browse/KAFKA-10633
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Bradley Peterson
>Priority: Major
> Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 
> 46.409Z.csv
>
>
> We are seeing a few issues with the new rebalancing behavior in Streams 2.6. 
> This ticket is for constant probing rebalances on one StreamThread, but I'll 
> mention the other issues, as they may be related.
> First, when we redeploy the application we see tasks being moved, even though 
> the task assignment was stable before redeploying. We would expect to see 
> tasks assigned back to the same instances and no movement. The application is 
> in EC2, with persistent EBS volumes, and we use static group membership to 
> avoid rebalancing. To redeploy the app we terminate all EC2 instances. The 
> new instances will reattach the EBS volumes and use the same group member id.
> After redeploying, we sometimes see the group leader go into a tight probing 
> rebalance loop. This doesn't happen immediately, it could be several hours 
> later. Because the redeploy caused task movement, we see expected probing 
> rebalances every 10 minutes. But, then one thread will go into a tight loop 
> logging messages like "Triggering the followup rebalance scheduled for 
> 1603323868771 ms.", handling the partition assignment (which doesn't change), 
> then "Requested to schedule probing rebalance for 1603323868771 ms." This 
> repeats several times a second until the app is restarted again. I'll attach 
> a log export from one such incident.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-23 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r511174838



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -770,7 +770,7 @@ class ReplicaManager(val config: KafkaConfig,
 logManager.abortAndPauseCleaning(topicPartition)
 
 val initialFetchState = 
InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
-  partition.getLeaderEpoch, futureLog.highWatermark)
+  partition.getLeaderEpoch, futureLog.highWatermark, 
lastFetchedEpoch = None)

Review comment:
   Looking at this again, I think a bit more work is required to set the 
offsets and epoch correctly for AlterLogDirsThread in order to use 
`lastFetchedEpoch`. So I have reverted the changes for 
ReplicaAlterLogDirsThread. Will do that in a follow-on PR instead. In this PR, 
we will use the old truncation path in this case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331 add a streams handler

2020-10-23 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r511149017



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -346,26 +351,92 @@ public void setStateListener(final 
KafkaStreams.StateListener listener) {
  * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
  * terminates due to an uncaught exception.
  *
- * @param eh the uncaught exception handler for all internal threads; 
{@code null} deletes the current handler
+ * @param uncaughtExceptionHandler the uncaught exception handler for all 
internal threads; {@code null} deletes the current handler
  * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ *
+ * @Deprecated Since 2.7.0. Use {@link 
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} 
instead.
+ *
  */
-public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh) {
+public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
 synchronized (stateLock) {
 if (state == State.CREATED) {
 for (final StreamThread thread : threads) {
-thread.setUncaughtExceptionHandler(eh);
+
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
 }
 
 if (globalStreamThread != null) {
-globalStreamThread.setUncaughtExceptionHandler(eh);
+
globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
 }
 } else {
 throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
-"Current state is: " + state);
+"Current state is: " + state);
+}
+}
+}
+
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads; 
{@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException @NotNull if 
streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
 }
 }
 }
 
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
handleStreamsUncaughtException(final Throwable e,
+   
  final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+//case REPLACE_STREAM_THREAD:
+//log.error("Encountered the following exception during 
processing " +
+//"and the the stream thread will be replaced: ", e);
+//this.addStreamsThread();
+//break;
+case SHUTDOWN_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the client is going to shut down: ", e);
+close(Duration.ZERO);
+break;
+case SHUTDOWN_APPLICATION:
+  

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331 add a streams handler

2020-10-23 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r511148601



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -782,7 +849,12 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1);
+i + 1,
+KafkaStreams.this::close,

Review comment:
   This will call closeToError but I am testing if that has a problem. So 
far it does not





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-23 Thread GitBox


wcarlson5 commented on pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#issuecomment-715581699


   Moved to https://github.com/apache/kafka/pull/9487



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 closed pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-23 Thread GitBox


wcarlson5 closed pull request #9273:
URL: https://github.com/apache/kafka/pull/9273


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #9487: KAFKA-9331 add a streams handler

2020-10-23 Thread GitBox


wcarlson5 opened a new pull request #9487:
URL: https://github.com/apache/kafka/pull/9487


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


mjsax commented on a change in pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#discussion_r511135914



##
File path: build.gradle
##
@@ -942,6 +942,7 @@ project(':core') {
 from(project(':streams').jar) { into("libs/") }
 from(project(':streams').configurations.runtime) { into("libs/") }
 from(project(':streams:streams-scala').jar) { into("libs/") }
+
from(project(':streams:streams-scala').configurations.archives.artifacts.files.filter
 { file -> file.name.matches(".*scaladoc.*")}) { into("libs/") }

Review comment:
   Do we actually need to do this? I don't see any other line here that 
would copy JavaDocs-jar into the `lib` directory?
   
   \cc @ijuma 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-23 Thread GitBox


jsancio commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r52281



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * TODO: Also flush after minimum size limit is reached?
+ */
+public class BatchAccumulator implements Closeable {
+private final int epoch;
+private final Time time;
+private final Timer lingerTimer;
+private final int lingerMs;
+private final int maxBatchSize;
+private final CompressionType compressionType;
+private final MemoryPool memoryPool;
+private final ReentrantLock lock;
+private final RecordSerde serde;
+
+private long nextOffset;
+private BatchBuilder currentBatch;
+private List> completed;
+
+public BatchAccumulator(
+int epoch,
+long baseOffset,
+int lingerMs,
+int maxBatchSize,
+MemoryPool memoryPool,
+Time time,
+CompressionType compressionType,
+RecordSerde serde
+) {
+this.epoch = epoch;
+this.lingerMs = lingerMs;
+this.maxBatchSize = maxBatchSize;
+this.memoryPool = memoryPool;
+this.time = time;
+this.lingerTimer = time.timer(lingerMs);
+this.compressionType = compressionType;
+this.serde = serde;
+this.nextOffset = baseOffset;
+this.completed = new ArrayList<>();
+this.lock = new ReentrantLock();
+}
+
+/**
+ * Append a list of records into an atomic batch. We guarantee all records
+ * are included in the same underlying record batch so that either all of
+ * the records become committed or none of them do.
+ *
+ * @param epoch the expected leader epoch
+ * @param records the list of records to include in a batch
+ * @return the offset of the last message or {@link Long#MAX_VALUE} if the 
epoch
+ * does not match
+ */
+public Long append(int epoch, List records) {
+if (epoch != this.epoch) {
+// If the epoch does not match, then the state machine probably
+// has not gotten the notification about the latest epoch change.
+// In this case, ignore the append and return a large offset value
+// which will never be committed.
+return Long.MAX_VALUE;
+}
+
+Object serdeContext = serde.newWriteContext();
+int batchSize = 0;
+for (T record : records) {
+batchSize += serde.recordSize(record, serdeContext);
+}
+
+if (batchSize > maxBatchSize) {
+throw new IllegalArgumentException("The total size of " + records 
+ " is " + batchSize +
+", which exceeds the maximum allowed batch size of " + 
maxBatchSize);
+}
+
+lock.lock();
+try {
+BatchBuilder batch = maybeAllocateBatch(batchSize);
+if (batch == null) {
+return null;
+}
+
+if (isEmpty()) {
+lingerTimer.update();
+lingerTimer.reset(lingerMs);
+}
+
+for (T record : records) {
+batch.appendRecord(record, serdeContext);
+nextOffset += 1;
+}
+
+return nextOffset - 1;
+} finally {
+lock.unlock();
+}
+}
+
+private BatchBuilder maybeAllocateBatch(int batchSize) {
+if (currentBatch == null) {
+startNewBatch();
+} else if (!currentBatch.hasRoomFor(batchSize)) {
+

[GitHub] [kafka] bbejeck commented on pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


bbejeck commented on pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#issuecomment-715558972


   >We probably only want to publish scaladoc for kafka streams since it's the 
only module that has a public Scala API.
   
   ack, working on that now



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


mjsax commented on pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#issuecomment-715556674


   @ijuma Correct! That is what KAFKA-9381 actually is all about.
   
   LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


ijuma commented on pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#issuecomment-715554159


   We probably only want to publish scaladoc for kafka streams since it's the 
only module that has a public Scala API.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-23 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10638:

Fix Version/s: 2.7.0

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
>   at 

[GitHub] [kafka] bbejeck commented on pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


bbejeck commented on pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#issuecomment-715526717


   Results of rendering contents of scaladoc jar
   
   ![Screen Shot 2020-10-23 at 2 59 32 
PM](https://user-images.githubusercontent.com/199238/97043484-8e095a80-1540-11eb-9711-b9a801db24f5.png)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


bbejeck commented on a change in pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#discussion_r511082358



##
File path: build.gradle
##
@@ -1439,7 +1440,7 @@ project(':streams:streams-scala') {
   }
 
   jar {
-dependsOn 'copyDependantLibs'
+dependsOn 'copyDependantLibs' , 'scaladocJar'

Review comment:
   @ijuma good point, I've removed it and still get the required doc jars





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-23 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219873#comment-17219873
 ] 

John Roesler commented on KAFKA-10638:
--

Hey [~bbejeck] , I've marked this as a 2.7.0 blocker. It's a regression (due to 
KAFKA-10598), but fortunately, only a regression in the store query test. 
Still, we should get the fix in.

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at 

[jira] [Created] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-23 Thread John Roesler (Jira)
John Roesler created KAFKA-10638:


 Summary: QueryableStateIntegrationTest fails due to stricter store 
checking
 Key: KAFKA-10638
 URL: https://issues.apache.org/jira/browse/KAFKA-10638
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.0
Reporter: John Roesler
Assignee: John Roesler


Observed:
{code:java}
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING
at 
org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
at 
org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
at 
org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 

[GitHub] [kafka] ijuma commented on a change in pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


ijuma commented on a change in pull request #9486:
URL: https://github.com/apache/kafka/pull/9486#discussion_r511073744



##
File path: build.gradle
##
@@ -1439,7 +1440,7 @@ project(':streams:streams-scala') {
   }
 
   jar {
-dependsOn 'copyDependantLibs'
+dependsOn 'copyDependantLibs' , 'scaladocJar'

Review comment:
   Hmm, seems weird to require scaladoc when building jars.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10633) Constant probing rebalances in Streams 2.6

2020-10-23 Thread Bradley Peterson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219866#comment-17219866
 ] 

Bradley Peterson commented on KAFKA-10633:
--

Thank you, Sophie! I'll do a build with the PR for KAFKA-10455 and confirm that 
it fixes this issue.

> Constant probing rebalances in Streams 2.6
> --
>
> Key: KAFKA-10633
> URL: https://issues.apache.org/jira/browse/KAFKA-10633
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Bradley Peterson
>Priority: Major
> Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 
> 46.409Z.csv
>
>
> We are seeing a few issues with the new rebalancing behavior in Streams 2.6. 
> This ticket is for constant probing rebalances on one StreamThread, but I'll 
> mention the other issues, as they may be related.
> First, when we redeploy the application we see tasks being moved, even though 
> the task assignment was stable before redeploying. We would expect to see 
> tasks assigned back to the same instances and no movement. The application is 
> in EC2, with persistent EBS volumes, and we use static group membership to 
> avoid rebalancing. To redeploy the app we terminate all EC2 instances. The 
> new instances will reattach the EBS volumes and use the same group member id.
> After redeploying, we sometimes see the group leader go into a tight probing 
> rebalance loop. This doesn't happen immediately, it could be several hours 
> later. Because the redeploy caused task movement, we see expected probing 
> rebalances every 10 minutes. But, then one thread will go into a tight loop 
> logging messages like "Triggering the followup rebalance scheduled for 
> 1603323868771 ms.", handling the partition assignment (which doesn't change), 
> then "Requested to schedule probing rebalance for 1603323868771 ms." This 
> repeats several times a second until the app is restarted again. I'll attach 
> a log export from one such incident.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-10-23 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10284:
-
Fix Version/s: 2.5.2

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Critical
>  Labels: help-wanted
> Fix For: 2.7.0, 2.5.2, 2.6.1
>
> Attachments: How to reproduce the issue in KAFKA-10284.md
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-10-23 Thread GitBox


vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-715502309


   Thanks for that fix, @dongjinleekr !
   
   Unfortunately, there are still 61 test failures: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9414/6/?cloudbees-analytics-link=scm-reporting%2Fstage%2Ffailure#showFailuresLink
   
   Do you mind taking another look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10633) Constant probing rebalances in Streams 2.6

2020-10-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219861#comment-17219861
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10633:


A thread should always reset its scheduled rebalance after triggering one, and 
it will only set the rebalance schedule when it receives its assignment after a 
rebalance. And the probing rebalance is always scheduled for 10min past the 
current time, so the fact that you see the same time printed in the "Triggering 
the follow rebalance" and "Requested to schedule probing rebalance" messages 
indicates that the member did not actually go through a rebalance, it just 
received its previous assignment directly from the broker.

Also, a rebalance will never be completed in under a second, so seeing 
`Triggering the followup rebalance scheduled for 1603323868771 ms` followed 
immediately by `Requested to schedule probing rebalance for 1603323868771 ms`   
seems to verify that it did not in fact go through a rebalance.

[~thebearmayor] this issue is fixed in 2.7.0 and 2.6.1 – not sure when 2.6.1 
will be released but the 2.7.0 release is currently in progress so it should 
hopefully be available soon. Apologies for our oversight in designing KIP-441

> Constant probing rebalances in Streams 2.6
> --
>
> Key: KAFKA-10633
> URL: https://issues.apache.org/jira/browse/KAFKA-10633
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Bradley Peterson
>Priority: Major
> Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 
> 46.409Z.csv
>
>
> We are seeing a few issues with the new rebalancing behavior in Streams 2.6. 
> This ticket is for constant probing rebalances on one StreamThread, but I'll 
> mention the other issues, as they may be related.
> First, when we redeploy the application we see tasks being moved, even though 
> the task assignment was stable before redeploying. We would expect to see 
> tasks assigned back to the same instances and no movement. The application is 
> in EC2, with persistent EBS volumes, and we use static group membership to 
> avoid rebalancing. To redeploy the app we terminate all EC2 instances. The 
> new instances will reattach the EBS volumes and use the same group member id.
> After redeploying, we sometimes see the group leader go into a tight probing 
> rebalance loop. This doesn't happen immediately, it could be several hours 
> later. Because the redeploy caused task movement, we see expected probing 
> rebalances every 10 minutes. But, then one thread will go into a tight loop 
> logging messages like "Triggering the followup rebalance scheduled for 
> 1603323868771 ms.", handling the partition assignment (which doesn't change), 
> then "Requested to schedule probing rebalance for 1603323868771 ms." This 
> repeats several times a second until the app is restarted again. I'll attach 
> a log export from one such incident.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10633) Constant probing rebalances in Streams 2.6

2020-10-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219859#comment-17219859
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10633:


I'm like 99% sure this will turn out to be KAFKA-10455

> Constant probing rebalances in Streams 2.6
> --
>
> Key: KAFKA-10633
> URL: https://issues.apache.org/jira/browse/KAFKA-10633
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Bradley Peterson
>Priority: Major
> Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 
> 46.409Z.csv
>
>
> We are seeing a few issues with the new rebalancing behavior in Streams 2.6. 
> This ticket is for constant probing rebalances on one StreamThread, but I'll 
> mention the other issues, as they may be related.
> First, when we redeploy the application we see tasks being moved, even though 
> the task assignment was stable before redeploying. We would expect to see 
> tasks assigned back to the same instances and no movement. The application is 
> in EC2, with persistent EBS volumes, and we use static group membership to 
> avoid rebalancing. To redeploy the app we terminate all EC2 instances. The 
> new instances will reattach the EBS volumes and use the same group member id.
> After redeploying, we sometimes see the group leader go into a tight probing 
> rebalance loop. This doesn't happen immediately, it could be several hours 
> later. Because the redeploy caused task movement, we see expected probing 
> rebalances every 10 minutes. But, then one thread will go into a tight loop 
> logging messages like "Triggering the followup rebalance scheduled for 
> 1603323868771 ms.", handling the partition assignment (which doesn't change), 
> then "Requested to schedule probing rebalance for 1603323868771 ms." This 
> repeats several times a second until the app is restarted again. I'll attach 
> a log export from one such incident.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #9481: KAFKA-10284: Disable static membership test in 2.4

2020-10-23 Thread GitBox


vvcephei merged pull request #9481:
URL: https://github.com/apache/kafka/pull/9481


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9481: KAFKA-10284: Disable static membership test in 2.4

2020-10-23 Thread GitBox


vvcephei commented on pull request #9481:
URL: https://github.com/apache/kafka/pull/9481#issuecomment-715499343


   Thanks @abbccdda !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9481: KAFKA-10284: Disable static membership test in 2.4

2020-10-23 Thread GitBox


vvcephei commented on a change in pull request #9481:
URL: https://github.com/apache/kafka/pull/9481#discussion_r511058225



##
File path: tests/kafkatest/tests/streams/streams_static_membership_test.py
##
@@ -48,6 +49,7 @@ def __init__(self, test_context):
throughput=1000,
acks=1)
 
+@ignore

Review comment:
   ```suggestion
   # This test fails due to a bug that is fixed in 2.5+ (KAFKA-10284). We 
opted not to backport
   # the fix to 2.4 and instead marked this test as ignored. If desired, 
the fix can be backported,
   # but it is non-trivial to do so.
   @ignore
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-23 Thread GitBox


rajinisivaram commented on pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#issuecomment-715493391


   @hachikuji Thanks for the review, have addressed the comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-23 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r511051808



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -770,7 +770,7 @@ class ReplicaManager(val config: KafkaConfig,
 logManager.abortAndPauseCleaning(topicPartition)
 
 val initialFetchState = 
InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
-  partition.getLeaderEpoch, futureLog.highWatermark)
+  partition.getLeaderEpoch, futureLog.highWatermark, 
lastFetchedEpoch = None)

Review comment:
   I had reset this because 
`ReassignPartitionsIntegrationTest.testAlterLogDirReassignmentThrottle` was 
failing consistently when the offset was test. Having spent a whole day looking 
at what I had broken, I think it is an existing issue. The test itself looks 
new and I can recreate it on trunk. Looks like 
https://issues.apache.org/jira/browse/KAFKA-9087. Will look into that 
separately. For now, I have updated this path.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-23 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r511047510



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -629,7 +680,9 @@ abstract class AbstractFetcherThread(name: String,
 
   val initialLag = leaderEndOffset - offsetToFetch
   fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
-  PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, 
state = Fetching)
+  // We don't expect diverging epochs from the next fetch request, so 
resetting `lastFetchedEpoch`

Review comment:
   Updated





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-23 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r511047757



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -341,11 +352,18 @@ abstract class AbstractFetcherThread(name: String,
   // ReplicaDirAlterThread may have removed topicPartition 
from the partitionStates after processing the partition data
   if (validBytes > 0 && 
partitionStates.contains(topicPartition)) {
 // Update partitionStates only if there is no 
exception during processPartitionData
-val newFetchState = PartitionFetchState(nextOffset, 
Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
+val newFetchState = PartitionFetchState(nextOffset, 
Some(lag),
+  currentFetchState.currentLeaderEpoch, state = 
Fetching,
+  Some(currentFetchState.currentLeaderEpoch))

Review comment:
   Oops, fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-23 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r511046915



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -426,21 +454,42 @@ abstract class AbstractFetcherThread(name: String,
 warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && 
initialFetchState.lastFetchedEpoch.nonEmpty) {
+  if (currentState == null) {
+return PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+  state = Fetching, initialFetchState.lastFetchedEpoch)
+  }
+  // If we are in `Fetching` state can continue to fetch regardless of 
current leader epoch and truncate
+  // if necessary based on diverging epochs returned by the leader. If we 
are currently in Truncating state,
+  // fall through and handle based on current epoch.
+  if (currentState.state == Fetching) {
+return currentState

Review comment:
   Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-23 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r511047109



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String,
 val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
   case Some(offsetTruncationState) =>
 val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating
+// Resetting `lastFetchedEpoch` since we are truncating and don't 
expect diverging epoch in the next fetch

Review comment:
   Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #9486: KAFKA-9381: Fix releaseTarGz to publish valid scaladoc files

2020-10-23 Thread GitBox


bbejeck opened a new pull request #9486:
URL: https://github.com/apache/kafka/pull/9486


   Fixes the release target to produce valid scaladoc and javadoc jar files
   
   To test:
   
   1. `./gradlew clean && ./gradlewAll releaseTarGz
   2. cd into `core/build/distributions`
   3. run `tar xvzf kafka_2.13-2.8.0-SNAPSHOT.tgz`
   4. run `ls -la kafka_2.13-2.8.0-SNAPSHOT/libs | grep doc`
   5. This should produce the following
   ```
   -rw-r--r--1 bill  staff   1326180 Oct 23 12:58 
kafka-streams-scala_2.13-2.8.0-SNAPSHOT-scaladoc.jar
   -rw-r--r--1 bill  staff413518 Oct 23 12:56 
kafka_2.13-2.8.0-SNAPSHOT-javadoc.jar
   -rw-r--r--1 bill  staff   6554029 Oct 23 12:57 
kafka_2.13-2.8.0-SNAPSHOT-scaladoc.jar
  ```
   Note the size of the docs jars
   
   6. Repeat steps for `kafka_2.12-2.8.0-SNAPSHOT.tgz `
   
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9381) Javadocs + Scaladocs not published on maven central

2020-10-23 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9381:
---
Priority: Blocker  (was: Critical)

> Javadocs + Scaladocs not published on maven central
> ---
>
> Key: KAFKA-9381
> URL: https://issues.apache.org/jira/browse/KAFKA-9381
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Julien Jean Paul Sirocchi
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 2.8.0
>
>
> As per title, empty (aside for MANIFEST, LICENCE and NOTICE) 
> javadocs/scaladocs jars on central for any version (kafka nor scala), e.g.
> [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9381) Javadocs + Scaladocs not published on maven central

2020-10-23 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9381:
---
Fix Version/s: (was: 2.8.0)
   2.7.0

> Javadocs + Scaladocs not published on maven central
> ---
>
> Key: KAFKA-9381
> URL: https://issues.apache.org/jira/browse/KAFKA-9381
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Julien Jean Paul Sirocchi
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 2.7.0
>
>
> As per title, empty (aside for MANIFEST, LICENCE and NOTICE) 
> javadocs/scaladocs jars on central for any version (kafka nor scala), e.g.
> [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna edited a comment on pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit

2020-10-23 Thread GitBox


cadonna edited a comment on pull request #9479:
URL: https://github.com/apache/kafka/pull/9479#issuecomment-715463242


   Yeah, that was also my conclusion!
   Additionally, I trust @abbccdda's approval since -- according to the git 
history -- he is familiar with the code.   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit

2020-10-23 Thread GitBox


cadonna commented on pull request #9479:
URL: https://github.com/apache/kafka/pull/9479#issuecomment-715463242


   Yeah, that was also my conclusion!
   Additionally, I trust @abbccdda approval since -- according to the git 
history -- he is familiar with the code.   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-23 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r511012913



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1005,6 +1013,36 @@ private[kafka] class Processor(val id: Int,
 selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+   nowNanos: Long,
+   connectionId: String,
+   context: RequestContext,
+   principalSerde: 
Option[KafkaPrincipalSerde]) = {
+val envelopeRequest = 
context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+val originalHeader = RequestHeader.parse(envelopeRequest.requestData)
+// Leave the principal null here is ok since we will fail the request 
during Kafka API handling.
+val originalPrincipal = if (principalSerde.isDefined)
+  principalSerde.get.deserialize(envelopeRequest.principalData)
+else
+  null
+
+// The forwarding broker and the active controller should have the same 
DNS resolution, and we need
+// to have the original client address for authentication purpose.
+val originalClientAddress = 
InetAddress.getByName(envelopeRequest.clientHostName)

Review comment:
   So the proposal is simply for saving the unnecessary dns translation? 
Not sure if representing as bytes would also serve the security purpose as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-10-23 Thread GitBox


junrao commented on a change in pull request #9364:
URL: https://github.com/apache/kafka/pull/9364#discussion_r510527544



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -298,26 +300,38 @@ class LogManager(logDirs: Seq[File],
   /**
* Recover and load all logs in the given data directories
*/
-  private def loadLogs(): Unit = {
+  private[log] def loadLogs(): Unit = {
 info(s"Loading logs from log dirs $liveLogDirs")
 val startMs = time.hiResClockMs()
 val threadPools = ArrayBuffer.empty[ExecutorService]
 val offlineDirs = mutable.Set.empty[(String, IOException)]
-val jobs = mutable.Map.empty[File, Seq[Future[_]]]
+val jobs = ArrayBuffer.empty[Seq[Future[_]]]
 var numTotalLogs = 0
 
 for (dir <- liveLogDirs) {
   val logDirAbsolutePath = dir.getAbsolutePath
+  var hadCleanShutdown: Boolean = false
   try {
 val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
 threadPools.append(pool)
 
 val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
 if (cleanShutdownFile.exists) {
   info(s"Skipping recovery for all logs in $logDirAbsolutePath since 
clean shutdown file was found")
+  // Cache the clean shutdown status and use that for rest of log 
loading workflow. Delete the CleanShutdownFile
+  // so that if broker crashes while loading the log, it is considered 
hard shutdown during the next boot up. KAFKA-10471
+  try {
+cleanShutdownFile.delete()
+  } catch {
+case e: IOException =>

Review comment:
   This is an existing issue, but cleanShutdownFile.delete() doesn't seem 
to throw IOException.

##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -1257,7 +1328,7 @@ class LogTest {
 log.close()
 
 // After reloading log, producer state should not be regenerated
-val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
+val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L, 
lastShutdownClean = false)

Review comment:
   Hmm, it seems that this tests expects a clean shutdown.

##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -2131,12 +2202,12 @@ class LogTest {
   assertEquals("Should have same number of time index entries as before.", 
numTimeIndexEntries, log.activeSegment.timeIndex.entries)
 }
 
-log = createLog(logDir, logConfig, recoveryPoint = lastOffset)
+log = createLog(logDir, logConfig, recoveryPoint = lastOffset, 
lastShutdownClean = false)

Review comment:
   It seems that this expects a clean shutdown.

##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -4445,11 +4504,9 @@ class LogTest {
 (log, segmentWithOverflow)
   }
 
-  private def recoverAndCheck(config: LogConfig,
-  expectedKeys: Iterable[Long],
-  expectDeletedFiles: Boolean = true): Log = {
-LogTest.recoverAndCheck(logDir, config, expectedKeys, brokerTopicStats, 
mockTime, mockTime.scheduler,
-  expectDeletedFiles)
+  private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long], 
expectDeletedFiles: Boolean = true) = {

Review comment:
   This is an existing problem. Since no callers are explicitly setting 
expectDeletedFiles, could we just remove this param?

##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -3623,7 +3690,7 @@ class LogTest {
 log.close()
 
 // reopen the log and recover from the beginning
-val recoveredLog = createLog(logDir, LogConfig())
+val recoveredLog = createLog(logDir, LogConfig(), lastShutdownClean = 
false)

Review comment:
   It seem the createLog() call on line 3976 inside 
testRecoverOnlyLastSegment() needs to have lastShutdownClean = false.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-23 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r511007228



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
 info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+if (!request.header.apiKey.forwardable && 
request.envelopeContext.isDefined) {

Review comment:
   For pt2, if the forwarding is not enabled on the active controller, but 
it has the capability, should we just serve the request?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10627) Connect TimestampConverter transform does not support multiple formats for the same field and only allows one field to be transformed at a time

2020-10-23 Thread Joshua Grisham (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219788#comment-17219788
 ] 

Joshua Grisham commented on KAFKA-10627:


I have actually made the changes that I think will work for all of this.  Will 
try to put together a PR maybe on Monday.

> Connect TimestampConverter transform does not support multiple formats for 
> the same field and only allows one field to be transformed at a time
> ---
>
> Key: KAFKA-10627
> URL: https://issues.apache.org/jira/browse/KAFKA-10627
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Joshua Grisham
>Priority: Minor
>
> Some of the limitations of the *TimestampConverter* transform are causing 
> issues for us since we have a lot of different producers from different 
> systems producing events to some of our topics.  We try our best to have 
> governance on the data formats including strict usage of Avro schemas but 
> there are still variations in timestamp data types that are allowed by the 
> schema.
> In the end there will be multiple formats coming into the same timestamp 
> fields (for example, with and without milliseconds, with and without a 
> timezone specifier, etc).
> And then you get failed events in Connect with messages like this:
> {noformat}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorror(RetryWithToleranceOperator.java:178)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at 
> org.apache.ntime.TransformationChain.apply(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
>   at 
> org.aect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
>   at org.apache.kafka.corkerSinkTask.iteration(WorkerSinkTask.java:228)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
>   at org.apache.kafka.connect.runtime.WorrkerTask.java:184)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   atrrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at java.util.concurrent.ThreadPoolExecutor$WorolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.connect.errors.DataException: Could not parse 
> timestamp: value (2020-10-06T12:12:27h pattern (-MM-dd'T'HH:mm:ss.SSSX)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120)
>   at 
> org.apache.kafka.connect.transformrter.convertTimestamp(TimestampConverter.java:450)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375)
>   at 
> org.apachtransforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279)
>   at 
> .connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithT.java:128)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   ... 14 more
> Caused by: java.text.Unparseable date: \"2020-10-06T12:12:27Z\"
>   at java.text.DateFormat.parse(DateFormat.java:366)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampCo)
>   ... 21 more
> {noformat}
>  
> My thinking is that maybe a good solution is to switch from using 
> *java.util.Date* to instead using *java.util.Time*, then instead of 
> *SimpleDateFormatter* switch to *DateTimeFormatter* which will allow usage of 
> more sophisticated patterns in the config to match multiple different 
> allowable formats.
> For example instead of effectively doing this:
> {code:java}
> SimpleDateFormat format = new 
> SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX");{code}
> It can be something like this:
> {code:java}
> DateTimeFormatter format = DateTimeFormatter.ofPattern("[-MM-dd[['T'][ 
> ]HH:mm:ss[.SSSz][.SSS[XXX][X");{code}
> Also if there are multiple timestamp fields in the schema/events, then 

[GitHub] [kafka] justpolidor commented on pull request #8500: Make Kafka Connect Source idempotent

2020-10-23 Thread GitBox


justpolidor commented on pull request #8500:
URL: https://github.com/apache/kafka/pull/8500#issuecomment-715446450


   Why this pull request isn't merged? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] justpolidor edited a comment on pull request #8500: Make Kafka Connect Source idempotent

2020-10-23 Thread GitBox


justpolidor edited a comment on pull request #8500:
URL: https://github.com/apache/kafka/pull/8500#issuecomment-715446450


   Why isn't this pull request merged? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit

2020-10-23 Thread GitBox


ableegoldman commented on pull request #9479:
URL: https://github.com/apache/kafka/pull/9479#issuecomment-715443836


   Ah nevermind, I see that this was introduced when we swapped 
INVALID_PRODUCER_EPOCH for PRODUCER_FENCED so it makes sense that we should 
handle PRODUCER_FENCED in the same way as the old error code



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-23 Thread GitBox


jsancio commented on pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#issuecomment-715440476


   Thanks @hachikuji for reviewing and merging the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit

2020-10-23 Thread GitBox


ableegoldman commented on pull request #9479:
URL: https://github.com/apache/kafka/pull/9479#issuecomment-715429024


   @cadonna did we double-check with the core/clients people to make sure this 
is right? It's not immediately obvious to me that this is a problem in the 
TransactionManager and not just in the way we handle it within Streams. But 
maybe it is obvious to someone who looked into the code more than I have
   
   (And I agree with Apurva, how did this not come up sooner? 樂 )



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #168: KAFKA-1811 Ensuring registered broker host:port is unique

2020-10-23 Thread GitBox


dajac commented on pull request #168:
URL: https://github.com/apache/kafka/pull/168#issuecomment-715424065


   This has been somewhat addressed by 
https://github.com/apache/kafka/pull/4897. The JIRA has been closed so I will 
close this PR. Please, reopen it if it is still relevant.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #168: KAFKA-1811 Ensuring registered broker host:port is unique

2020-10-23 Thread GitBox


dajac closed pull request #168:
URL: https://github.com/apache/kafka/pull/168


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-10-23 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-10024:

Summary: Add dynamic configuration and enforce quota for per-IP connection 
rate limits (KIP-612, part 2)  (was: Add dynamic configuration and enforce 
quota for per-IP connection rate limits)

> Add dynamic configuration and enforce quota for per-IP connection rate limits 
> (KIP-612, part 2)
> ---
>
> Key: KAFKA-10024
> URL: https://issues.apache.org/jira/browse/KAFKA-10024
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Anna Povzner
>Assignee: David Mao
>Priority: Major
>  Labels: features
>
> This JIRA is for the second part of KIP-612 – Add per-IP connection creation 
> rate limits.
> As described here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-23 Thread GitBox


dajac commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r510873494



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+
+ip match {
+  case Some(addr) =>
+val address = InetAddress.getByName(addr)
+maxConnectionRate match {
+  case Some(rate) =>
+info(s"Updating max connection rate override for $address to 
$rate")
+connectionRatePerIp.put(address, rate)
+  case None =>
+info(s"Removing max connection rate override for $address")
+connectionRatePerIp.remove(address)
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+val newQuota = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+info(s"Updating default max IP connection rate to $newQuota")
+defaultConnectionRatePerIp = newQuota
+val allMetrics = metrics.metrics
+allMetrics.forEach { (metricName, metric) =>
+  if (isIpConnectionRateMetric(metricName) && 
shouldUpdateQuota(metric, newQuota)) {
+info(s"Updating existing connection rate sensor for 
${metricName.tags} to $newQuota")
+metric.config(rateQuotaMetricConfig(newQuota))
+  }

Review comment:
   I wonder if using `newQuota` is correct here. My understanding is that 
`ip == None` means that we update the default quota which is used if there is 
not per ip quota defined. So, we should also check if there is a per ip quota 
defined before overriding it with the new default, isn't it?
   
   I would be great if we could add more unit tests to cover this logic with 
multiple IPs and/or default.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -526,10 +527,14 @@ private[kafka] abstract class 
AbstractServerThread(connectionQuotas: ConnectionQ
 if (channel != null) {
   debug(s"Closing connection from 
${channel.socket.getRemoteSocketAddress()}")
   connectionQuotas.dec(listenerName, channel.socket.getInetAddress)
-  CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
-  CoreUtils.swallow(channel.close(), this, Level.ERROR)
+  closeSocket(channel)
 }
   }
+
+  protected def closeSocket(channel: SocketChannel): Unit = {

Review comment:
   nit: Could we make this one private?

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1375,6 +1516,45 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
 }
   }
 
+  /**
+   * To avoid over-recording listener/broker connection rate, we unrecord a 
listener or broker connection
+   * if the IP gets throttled later.
+   *
+   * @param listenerName listener to unrecord connection
+   * @param timeMs current time in milliseconds
+   */
+  private def unrecordListenerConnection(listenerName: ListenerName, timeMs: 
Long): Unit = {
+if (!protectedListener(listenerName)) {
+  brokerConnectionRateSensor.record(-1.0, timeMs, false)
+}
+maxConnectionsPerListener
+  .get(listenerName)
+  .foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
+  }
+
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to the IP limit.
+   * If the connection would cause an IP quota violation, un-record the 
connection
+   *
+   * @param address
+   * @param timeMs
+   * @return

Review comment:
   nit: To be completed.

##
File path: 

[jira] [Created] (KAFKA-10637) KafkaProducer: IllegalMonitorStateException

2020-10-23 Thread Lefteris Katiforis (Jira)
Lefteris Katiforis created KAFKA-10637:
--

 Summary: KafkaProducer: IllegalMonitorStateException 
 Key: KAFKA-10637
 URL: https://issues.apache.org/jira/browse/KAFKA-10637
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.5.1
Reporter: Lefteris Katiforis


Kafka producer throws the following exception:
{code:java}
{\"log\":\"java.lang.IllegalMonitorStateException: current thread is not 
owner\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415014714Z\"}"}
 java.base/java.lang.Object.wait(Native 
Method)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.41502027Z\"}"}
org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415024923Z\"}"}
at 
org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415029863Z\"}"}
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1029)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415034336Z\"}"}
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:883)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415038722Z\"}"}
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415042939Z\"}"}
org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415047238Z\"}"}
org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415051555Z\"}"}
org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:369)\\n\",\"stream\":\"stdout\",\"time\":\"2020-10-23T12:48:50.415055882Z\"}"}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-10-23 Thread Flavien Raynaud (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219704#comment-17219704
 ] 

Flavien Raynaud commented on KAFKA-8733:


Good point, we've not tried this in production. We haven't had enough 
occurrences of this issue (thankfully :p) to know how long it takes between 
"leader has disk issues and slow reads" and "leader dies because of disk 
failure, revoking leadership", and what would be a good compromise with the "a 
follower is genuinely falling behind" case.
We can try this for some of our clusters with lower consistency requirements, 
but it may be weeks/months before we hit this issue happen again.

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10466) Add Regex option for replacement on MaskField SMT

2020-10-23 Thread Shadi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219606#comment-17219606
 ] 

Shadi edited comment on KAFKA-10466 at 10/23/20, 10:09 AM:
---

[~daehokimm] Hi, send and email to 
[d...@kafka.apache.org|mailto:to%c2%a0...@kafka.apache.org] to ask for 
contributor permission, add your Jira id: daehokimm to the email.


was (Author: kajevand):
[~daehokimm] Hi, send and email to 
[d...@kafka.apache.org|mailto:to%c2%a0...@kafka.apache.org] to get contributor 
permission, add your Jira id: daehokimm to the email.

> Add Regex option for replacement on MaskField SMT 
> --
>
> Key: KAFKA-10466
> URL: https://issues.apache.org/jira/browse/KAFKA-10466
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Daeho Kim
>Priority: Major
>  Labels: Needs-kip, newbie
>
> The _org.apache.kafka.connect.transforms.MaskField_ SMT can use replacement 
> for numeric and string type fields. it would be nice to optionally be able to 
> masking with regular expression for string fields only. then it can mask the 
> data partially.
>  
> Use cases : mask out the IP address just the last two parts. (ex. 
> 123.45.67.89 -> 123.45.***.***)
>  
> Since this changes the API, and thus will require a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10466) Add Regex option for replacement on MaskField SMT

2020-10-23 Thread Shadi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219606#comment-17219606
 ] 

Shadi commented on KAFKA-10466:
---

[~daehokimm] Hi, send and email to 
[d...@kafka.apache.org|mailto:to%c2%a0...@kafka.apache.org] to get contributor 
permission, add your Jira id: daehokimm to the email.

> Add Regex option for replacement on MaskField SMT 
> --
>
> Key: KAFKA-10466
> URL: https://issues.apache.org/jira/browse/KAFKA-10466
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Daeho Kim
>Priority: Major
>  Labels: Needs-kip, newbie
>
> The _org.apache.kafka.connect.transforms.MaskField_ SMT can use replacement 
> for numeric and string type fields. it would be nice to optionally be able to 
> masking with regular expression for string fields only. then it can mask the 
> data partially.
>  
> Use cases : mask out the IP address just the last two parts. (ex. 
> 123.45.67.89 -> 123.45.***.***)
>  
> Since this changes the API, and thus will require a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-10-23 Thread GitBox


soarez commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-715213289


   Yes, sorry for that @mjsax . I thought I had already pushed the fix. Not 
enough ☕ 
   
   Fixed it but seems one test still failed for JDK 8 - 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta`
 - but it seems unrelated to this patch and [known to be 
flaky](https://github.com/apache/kafka/pull/9466#issuecomment-713246622). 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9755) Implement versioning scheme for features

2020-10-23 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219553#comment-17219553
 ] 

feyman commented on KAFKA-9755:
---

Hi, [~kprakasam], can I take sub-task 7/8 as start-up tasks to get familiar 
with the feature versioning schemes? Thanks!

> Implement versioning scheme for features
> 
>
> Key: KAFKA-9755
> URL: https://issues.apache.org/jira/browse/KAFKA-9755
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core, protocol, streams
>Reporter: Kowshik Prakasam
>Priority: Major
>
> Details are in this wiki: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]
>  .
> This is the master Jira for tracking the implementation of versioning scheme 
> for features to facilitate client discovery and feature gating (as explained 
> in the above wiki).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10323) NullPointerException during rebalance

2020-10-23 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10323:
---
Labels:   (was: newbie)

> NullPointerException during rebalance
> -
>
> Key: KAFKA-10323
> URL: https://issues.apache.org/jira/browse/KAFKA-10323
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: yazgoo
>Assignee: Luke Chen
>Priority: Major
>
> *confluent platform version: 5.5.0-ccs*
> connector used: s3
> Connector stops after rebalancing:
> ERROR [Worker clientId=connect-1, groupId=connect] Couldn't instantiate task 
>  because it has an invalid task configuration. This task will not 
> execute until reconfigured.
> java.lang.NullPointerException
>  at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:427)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2020-10-23 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219548#comment-17219548
 ] 

Chia-Ping Tsai commented on KAFKA-5676:
---

Go through streams code base, it seems most internals directly use 
StreamsMetricsImpl rather than StreamsMetrics. not sure whether it is 
worthwhile to do "true" mock of StreamsMetrics.

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc edited a comment on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-23 Thread GitBox


ning2008wisc edited a comment on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-714955565


   Thanks @mimaison for your high-level advice and detailed review.
   
   (1) I responded to your every comments. A "thumb-up" means I made the 
suggested change
   (2) regarding the increased complexity, here is my analysis for each test 
case:
(2a) `testReplication`: This is an existing test and no new stuff are 
added, except checking if topic config are also mirrored. If we believe the 
topic config validation is trivial, I am more than happy to remove it
(2b) `testReplicationWithEmptyPartition`: This is an existing test and 
no new stuff are added
(2c) `testOneWayReplicationWithAutoOffsetSync`: This is an existing 
test and no new stuff are added
(2d) `testWithBrokerRestart`: This is a **new** test, which introduced 
background producer and consumer, called `ThreadedProducer` and 
`ThreadedConsumer`.  The purpose of background producer and consumer is to 
better test the failure case to avoid serialization (produce -> broker restart 
-> consumer) which can not reveal the insight that consumers consume the same 
records more than once when broker failure happens.
(2e) `MirrorConnectorsIntegrationSSLTest`: This is a **new** test and 
it should be pretty lean and neat.
   
   In addition, the **new** base test class 
`MirrorConnectorsIntegrationBaseTest` follows the existing practice of K-stream 
integration test structure 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L37
 so I assume it is likely the right way to structure tests with more than one 
scenarios (SSL v.s. non-SSL)
   
   If we have to control the complexity, I would prefer to drop 
`testWithBrokerRestart` and keep `MirrorConnectorsIntegrationSSLTest`, as it 
makes sense to run simple validation in SSL setup.
   
   Thanks and would like to hear more thoughts and feedback. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc edited a comment on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-23 Thread GitBox


ning2008wisc edited a comment on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-714955565


   Thanks @mimaison for your high-level advice and detailed review.
   
   (1) I responded to your every comments. A "thumb-up" means I made the 
suggested change
   (2) regarding the increased complexity, here is my analysis for each test 
case:
(2a) `testReplication`: This is an existing test and no new stuff are 
added, except checking if topic config are also mirrored. If we believe the 
topic config validation is trivial, I am more than happy to remove it
(2b) `testReplicationWithEmptyPartition`: This is an existing test and 
no new stuff are added
(2c) `testOneWayReplicationWithAutoOffsetSync`: This is an existing 
test and no new stuff are added
(2d) `testWithBrokerRestart`: This is a **new** test, which introduced 
background producer and consumer, called `ThreadedProducer` and 
`ThreadedConsumer`.  The purpose of background producer and consumer is to 
better test the failure case to avoid serialization (produce -> broker restart 
-> consumer) which can not reveal the insight that consumers consume the same 
records more than once when broker failure happens.
(2e) `MirrorConnectorsIntegrationSSLTest`: This is a **new** test and 
it should be pretty lean and neat.
   
   In addition, the **new** base test class 
`MirrorConnectorsIntegrationBaseTest` follows the existing practice of K-stream 
integration test structure 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L37
 so I assume it is likely the right way tp structure tests with more than one 
scenarios (SSL v.s. non-SSL)
   
   If we have to control the complexity, I would prefer to drop 
`testWithBrokerRestart` and keep `MirrorConnectorsIntegrationSSLTest`, as it 
makes sense to run simple validation in SSL setup.
   
   Thanks and would like to hear more thoughts and feedback. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc edited a comment on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-23 Thread GitBox


ning2008wisc edited a comment on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-714955565


   Thanks @mimaison for your high-level advice and detailed review.
   
   (1) I responded to your every comments. A "thumb-up" means I made the 
suggested change
   (2) regarding the increased complexity, here is my analysis for each test 
case
   
(a) `testReplication`: This is an existing test and no new stuff are 
added, except checking if topic config are also mirrored. If we believe the 
topic config validation is trivial, I am more than happy to remove it
   
(b) `testReplicationWithEmptyPartition`: This is an existing test and 
no new stuff are added
   
(c) `testOneWayReplicationWithAutoOffsetSync`: This is an existing test 
and no new stuff are added
   
(d) `testWithBrokerRestart`: This is a **new** test, which introduced 
background producer and consumer, called `ThreadedProducer` and 
`ThreadedConsumer`.  The purpose of background producer and consumer is to 
better test the failure case to avoid serialization (produce -> broker restart 
-> consumer) which can not reveal the insight that consumers consume the same 
records more than once when broker failure happens.
   
(e) `MirrorConnectorsIntegrationSSLTest`: This is a **new** test and it 
should be pretty lean and neat.
   
   In addition, the **new** base test class 
`MirrorConnectorsIntegrationBaseTest` follows the existing practice of K-stream 
integration test structure 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L37
 so I assume it is likely the right way tp structure tests with more than one 
scenarios (SSL v.s. non-SSL)
   
   If we have to control the complexity, I would prefer to drop 
`testWithBrokerRestart` and keep `MirrorConnectorsIntegrationSSLTest`, as it 
makes sense to run simple validation in SSL setup.
   
   Thanks and would like to hear more thoughts and feedback. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc edited a comment on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-23 Thread GitBox


ning2008wisc edited a comment on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-714955565


   Thanks @mimaison for your high-level advice and detailed review.
   
   (1) I responded to your every comments. A "thumb-up" means I made the 
suggested change
   (2) regarding the increased complexity, here is my analysis for each test 
case
(a) `testReplication`: This is an existing test and no new stuff are 
added, except checking if topic config are also mirrored. If we believe the 
topic config validation is trivial, I am more than happy to remove it
(b) `testReplicationWithEmptyPartition`: This is an existing test and 
no new stuff are added
(c) `testOneWayReplicationWithAutoOffsetSync`: This is an existing test 
and no new stuff are added
(d) `testWithBrokerRestart`: This is a **new** test, which introduced 
background producer and consumer, called `ThreadedProducer` and 
`ThreadedConsumer`.  The purpose of background producer and consumer is to 
better test the failure case to avoid serialization (produce -> broker restart 
-> consumer) which can not reveal the insight that consumers consume the same 
records more than once when broker failure happens.
(e) `MirrorConnectorsIntegrationSSLTest`: This is a **new** test and it 
should be pretty lean and neat.
   
   In addition, the **new** base test class 
`MirrorConnectorsIntegrationBaseTest` follows the existing practice of K-stream 
integration test structure 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L37
 so I assume it is likely the right way tp structure tests with more than one 
scenarios (SSL v.s. non-SSL)
   
   If we have to control the complexity, I would prefer to drop 
`testWithBrokerRestart` and keep `MirrorConnectorsIntegrationSSLTest`, as it 
makes sense to run simple validation in SSL setup.
   
   Thanks and would like to hear more thoughts and feedback. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc edited a comment on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-23 Thread GitBox


ning2008wisc edited a comment on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-714955565


   Thanks @mimaison for your high-level advice and detailed review.
   
   (1) I responded to your every comments. A "thumb-up" means I made the 
suggested change
   (2) regarding the increased complexity, here is my analysis for each test 
case
(a) `testReplication`: This is an existing test and no new stuff are 
added, except checking if topic config are also mirrored. If we believe the 
topic config validation is trivial, I am more than happy to remove it
   
(b) `testReplicationWithEmptyPartition`: This is an existing test and 
no new stuff are added
   
(c) `testOneWayReplicationWithAutoOffsetSync`: This is an existing test 
and no new stuff are added
   
(d) `testWithBrokerRestart`: This is a **new** test, which introduced 
background producer and consumer, called `ThreadedProducer` and 
`ThreadedConsumer`.  The purpose of background producer and consumer is to 
better test the failure case to avoid serialization (produce -> broker restart 
-> consumer) which can not reveal the insight that consumers consume the same 
records more than once when broker failure happens.
   
(e) `MirrorConnectorsIntegrationSSLTest`: This is a **new** test and it 
should be pretty lean and neat.
   
   In addition, the **new** base test class 
`MirrorConnectorsIntegrationBaseTest` follows the existing practice of K-stream 
integration test structure 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L37
 so I assume it is likely the right way tp structure tests with more than one 
scenarios (SSL v.s. non-SSL)
   
   If we have to control the complexity, I would prefer to drop 
`testWithBrokerRestart` and keep `MirrorConnectorsIntegrationSSLTest`, as it 
makes sense to run simple validation in SSL setup.
   
   Thanks and would like to hear more thoughts and feedback. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc edited a comment on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-23 Thread GitBox


ning2008wisc edited a comment on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-714955565


   Thanks @mimaison for your high-level advice and detailed review.
   
   (1) I responded to your every comments. A "thumb-up" means I made the 
suggested change
   (2) regarding the increased complexity, here is my analysis for each test 
case
(a) `testReplication`: This is an existing test and no new stuff are 
added, except checking if topic config are also mirrored. If we believe the 
topic config validation is trivial, I am more than happy to remove it
(b) `testReplicationWithEmptyPartition`: This is an existing test and 
no new stuff are added
(c) `testOneWayReplicationWithAutoOffsetSync`: This is an existing test 
and no new stuff are added
(d) `testWithBrokerRestart`: This is a **new** test, which introduced 
background producer and consumer, called `ThreadedProducer` and 
`ThreadedConsumer`.  The purpose of background producer and consumer is to 
better test the failure case to avoid serialization (produce -> broker restart 
-> consumer) which can not reveal the insight that consumers consume the same 
records more than once when broker failure happens.
   
(e) `MirrorConnectorsIntegrationSSLTest`: This is a **new** test and it 
should be pretty lean and neat.
   
   In addition, the **new** base test class 
`MirrorConnectorsIntegrationBaseTest` follows the existing practice of K-stream 
integration test structure 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L37
 so I assume it is likely the right way tp structure tests with more than one 
scenarios (SSL v.s. non-SSL)
   
   If we have to control the complexity, I would prefer to drop 
`testWithBrokerRestart` and keep `MirrorConnectorsIntegrationSSLTest`, as it 
makes sense to run simple validation in SSL setup.
   
   Thanks and would like to hear more thoughts and feedback. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-23 Thread GitBox


ning2008wisc commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-714955565


   Thanks @mimaison for your high-level advice and detailed review.
   
   (1) I responded to your every comments. A "thumb-up" means I made the 
suggested change
   (2) regarding the increased complexity, here is my analysis for each test 
case
(a) `testReplication`: This is an existing test and no new stuff are 
added, except checking if topic config are also mirrored. If the topic config 
validation is trivial, I am happy to remove it
(b) `testReplicationWithEmptyPartition`: This is an existing test and 
no new stuff are added
(c) `testOneWayReplicationWithAutoOffsetSync`: This is an existing test 
and no new stuff are added
(d) `testWithBrokerRestart`: This is a **new** test, which introduced 
background producer and consumer, called `ThreadedProducer` and 
`ThreadedConsumer`.  The purpose of background producer and consumer is to 
better test the failure case to avoid serialization (produce -> broker restart 
-> consumer) by decoupling the producer thread, embedded kafka and consumer 
thread.
(e) `MirrorConnectorsIntegrationSSLTest`: This is a **new** test and it 
should be pretty lean and neat.
   
   In addition, the **new** base test class 
`MirrorConnectorsIntegrationBaseTest` follows the existing practice of K-stream 
integration test structure 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L37
 so I assume it is likely the right structure to extend from a base class.
   
   If we have to control the complexity, I would prefer to drop 
`testWithBrokerRestart` and keep `MirrorConnectorsIntegrationSSLTest`, as it 
makes sense to run simple validation in SSL setup.
   
   Thanks and would like to hear more thoughts and feedback. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-10-23 Thread Cheng Tan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219491#comment-17219491
 ] 

Cheng Tan edited comment on KAFKA-8733 at 10/23/20, 6:22 AM:
-

[~flavr] 

What would happen if you increase the lag timeout to a fairly large value so 
that the leaders do not remove replicas from the ISR so aggressively? I know 
this may seem like a simple fix but we are trying to evaluate.


was (Author: d8tltanc):
[~flavr]  increasing the lag timeout to a fairly large value so that the 
leaders do not remove replicas from the ISR so aggressively

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-10-23 Thread Cheng Tan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219491#comment-17219491
 ] 

Cheng Tan commented on KAFKA-8733:
--

[~flavr]  increasing the lag timeout to a fairly large value so that the 
leaders do not remove replicas from the ISR so aggressively

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)