[jira] [Commented] (KAFKA-10431) ProducerPerformance with payloadFile arg: add support for sequential or random outputs

2020-08-26 Thread Zaahir Laher (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185615#comment-17185615
 ] 

Zaahir Laher commented on KAFKA-10431:
--

Yes, so basically the payloads from the file will be sent to the topic 
sequentially from top to bottom.

> ProducerPerformance with payloadFile arg: add support for sequential or 
> random outputs
> --
>
> Key: KAFKA-10431
> URL: https://issues.apache.org/jira/browse/KAFKA-10431
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 2.5.1
>Reporter: Zaahir Laher
>Priority: Minor
>
> When using ProducerPerformance  with the --payloadFile argument with a file 
> with multiple payloads (i.e the default is one payload per line) , the 
> ProducerPerformance randomly chooses payloads from the file. 
> This could result in the same payload being sent, which may not be the 
> desired result in some cases. 
> It would be useful to all have another argument that allows for sequence 
> payload submission if required. If left blank this arg would default to false 
> (i.e default random selection).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] LMnet commented on a change in pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-26 Thread GitBox


LMnet commented on a change in pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#discussion_r478127423



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
##
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. 
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.serialization
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.UUID
+
+import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, 
Serdes => JSerdes}
+import org.apache.kafka.streams.kstream.WindowedSerdes
+
+object Serdes extends LowPrioritySerdes {
+  implicit def stringSerde: Serde[String] = JSerdes.String()
+  implicit def longSerde: Serde[Long] = 
JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit def javaLongSerde: Serde[java.lang.Long] = JSerdes.Long()
+  implicit def byteArraySerde: Serde[Array[Byte]] = JSerdes.ByteArray()
+  implicit def bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer()
+  implicit def shortSerde: Serde[Short] = 
JSerdes.Short().asInstanceOf[Serde[Short]]
+  implicit def javaShortSerde: Serde[java.lang.Short] = JSerdes.Short()
+  implicit def floatSerde: Serde[Float] = 
JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit def javaFloatSerde: Serde[java.lang.Float] = JSerdes.Float()
+  implicit def doubleSerde: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit def javaDoubleSerde: Serde[java.lang.Double] = JSerdes.Double()
+  implicit def intSerde: Serde[Int] = 
JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit def javaIntegerSerde: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()
+
+  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
+new WindowedSerdes.TimeWindowedSerde[T](tSerde)

Review comment:
   I got acquainted KIP-659 and in my opinion, there are few ways to handle 
this problem:
   1. Delete `timeWindowedSerde` from the new `Serdes` object, because it's a 
new code and we would not have any compatibility issues. Also, we could add 
some comments in the deprecation message about this serde.
   2. Add another implicit requirement to the `timeWindowedSerde`. For example 
`implicit streamsConfig: Properties`, like you suggested. But `Properties` is 
too wide and not specific type. I personally don't like this option. I think it 
could become one more kafka pitfall.
   3. Remove implicit modifier from `timeWindowedSerde`. We could make it a 
regular function that requires streams config explicitly.
   
   I prefer the third option.
   
   A bit of offtop, but I found all idea of configurable through config serdes 
confusing. I have to describe serdes in one place, but config them in some 
other place. I would prefer a system that has immutable and not configurable 
serdes with an extra layer on top of it with configurations. Serdes mutability 
already makes some difficulties and that's why in `Serdes` object all instances 
are `def`, not `val`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-26 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-681375395


   > This issue will be resolved if we complete those delayed operations due to 
groupManager.storeGroup() elsewhere without holding any locks.
   
   So ```DelayedJoin``` DOES NOT complete any delayed requests in this path 
anymore and we expect that someone who don't hold lock should complete them?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-26 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-681361030


   The following is my understand. The current PR introduces a new deadlock 
through the following path.
   
   path 1
   hold group lock -> joinPurgatory.tryCompleteElseWatch(delayedJoin) -> 
watchForOperation (now delayedJoin visible through other threads) -> 
operation.maybeTryComplete() -> hold delayedJoin.lock
   
   path 2
   delayedJoin.maybeTryComplete -> hold hold delayedJoin.lock -> tryComplete() 
-> hold group lock
   
   The existing code doesn't have this deadlock since (1) delayedJoin.lock is 
the same as the group lock held in the caller and (2) a delayed join operation 
is registered under the group key (so each time we check completeness for a 
group key, only one delayed join operation will be affected). By switching back 
to this code, we avoid the new deadlock.
   
   The existing code has a different deadlock issue that 
groupManager.storeGroup() in GroupCoordinator.onCompleteJoin may need to 
complete to other delayed operations and potentially hold a different group 
lock while already holding a group lock. This issue will be resolved if we 
complete those delayed operations due to groupManager.storeGroup() elsewhere 
without holding any locks.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-26 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r478069145



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+unsentIsrUpdates synchronized {
+  unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(time.milliseconds)
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {
+scheduledRequest = Some(scheduler.schedule("propagate-alter-isr", 
propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS))
+  }
+}
+  }
+
+  override def clearPending(topicPartition: TopicPartition): Unit = {
+unsentIsrUpdates synchronized {
+  // when we get a new LeaderAndIsr, we clear out any pending requests
+  unsentIsrUpdates.remove(topicPartition)
+}
+  }
+
+  private def propagateIsrChanges(): Unit = {
+val now = time.milliseconds()
+unsentIsrUpdates synchronized {
+  if (unsentIsrUpdates.nonEmpty) {
+val brokerEpoch: Long = zkClient.getBrokerEpoch(brokerId) match {

Review comment:
   I wasn't too happy about this. Is there another way to get the current 
broker epoch? As I understand it, the broker epoch can change during the 
lifecycle of a broker. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-26 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r478067221



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrManager {
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def clearPending(topicPartition: TopicPartition): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+  val zkClient: KafkaZkClient,
+  val scheduler: Scheduler,
+  val time: Time,
+  val brokerId: Int) extends AlterIsrManager with 
Logging with KafkaMetricsGroup {
+
+  private val unsentIsrUpdates: mutable.Map[TopicPartition, AlterIsrItem] = 
new mutable.HashMap[TopicPartition, AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  @volatile private var scheduledRequest: Option[ScheduledFuture[_]] = None
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+unsentIsrUpdates synchronized {
+  unsentIsrUpdates(alterIsrItem.topicPartition) = alterIsrItem
+  lastIsrChangeMs.set(time.milliseconds)
+  // Rather than sending right away, we'll delay at most 50ms to allow for 
batching of ISR changes happening
+  // in fast succession
+  if (scheduledRequest.isEmpty) {

Review comment:
   I think that sounds pretty reasonable. Would we need any kind of timeout 
at this layer, or just rely on the underlying channel to provide timeouts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-26 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-681318790


   > We keep DelayedJoin as it is, still passing in the group lock to 
DelayedOperation to avoid deadlocks due to two levels of locking.
   
   Just double check. We use a separate thread to handle 
```groupManager.storeGroup``` and ```joinPurgatory.tryCompleteElseWatch``` in 
```GroupCoordinator.onCompleteJoin```, right? 
```GroupCoordinator.onCompleteJoin``` is called by ```DelayedJoin.onComplete``` 
only and it is possible to hold a group lock already.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9179: KAFKA-10390: Remove ignore case option when grep process info to be more specific

2020-08-26 Thread GitBox


showuon commented on a change in pull request #9179:
URL: https://github.com/apache/kafka/pull/9179#discussion_r477984582



##
File path: bin/kafka-server-stop.sh
##
@@ -21,7 +21,7 @@ if [[ $(uname -s) == "OS/390" ]]; then
 fi
 PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v 
grep | awk '{print $1}')
 else
-PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
+PIDS=$(ps ax | grep 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')

Review comment:
   @lbradstreet , yes, that'd be better! Thanks for suggestion. I've 
updated in this commit: 
https://github.com/apache/kafka/pull/9179/commits/99bf042dc346cca5061e7f0bbcb30b1d736162f6.
 Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-26 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r477618083



##
File path: clients/src/main/resources/common/message/AlterIsrResponse.json
##
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "AlterIsrResponse",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top level response error code" },
+{ "name": "Topics", "type": "[]AlterIsrResponseTopics", "versions": "0+", 
"fields": [

Review comment:
   nit: I think `AlterIsrResponseTopics` should be singular (similarly for 
other arrays in both of these schemas). 
   
   Also, I wonder if it's reasonable to leave off the `AlterIsr` prefix. We 
could access it as `AlterIsrResponse.TopicData` or something like that.

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+if (useAlterIsr) {
+  expandIsrWithAlterIsr(newInSyncReplica)
+} else {
+  expandIsrWithZk(newInSyncReplica)
+}
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+// This is called from maybeExpandIsr which holds the ISR write lock
+if (pendingInSyncReplicaIds.isEmpty) {
+  // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
+  // a more constrained state for advancing the HW.
+  val newIsr = inSyncReplicaIds + newInSyncReplica
+  pendingInSyncReplicaIds = Some(newIsr)
+  debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR 
updated to [${newIsr.mkString(",")}] for $topicPartition")
+  alterIsr(newIsr)
+} else {
+  debug(s"ISR update in-flight, not adding new in-sync replica 
$newInSyncReplica for $topicPartition")

Review comment:
   Maybe trace would be better? This could get verbose while we have an 
inflight AlterIsr.

##
File path: clients/src/main/resources/common/message/AlterIsrRequest.json
##
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "AlterIsrRequest",
+  "validVersions": "0",
+  "flexibleVersions": "none",

Review comment:
   We may as well add flexible version support for the request and response.

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1234,6 +1314,36 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def alterIsr(newIsr: Set[Int]): Unit = {
+val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+alterIsrManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr, {
+  inWriteLock(leaderIsrUpdateLock) {
+case Errors.NONE =>
+  debug(s"Controller accepted 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-26 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r477784084



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
   We only need to check if the previous right window needs to be created 
if the current record's left window was _not_ previously a left-type window, ie 
the max_timestamp < timestamp. Otherwise, we'd have already created any windows 
since that implies we processed a record with this exact timestamp already. 
   So then why do we set `latestLeftTypeWindow = next` ? It seems like it's 
possible that this actually isn't a left-type window, in which case we 
shouldn't overwrite any existing value for `latestLeftTypeWindow`.
   On the other hand, if it actually _is_ a left type window, then that means 
we don't need to create the previous record's right window so we should just 
set it to null. But that seems to apply regardless of whether this is the early 
record combined window or not? 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed

Review comment:
   This comment kind of comes out of nowhere since there's no concept of 
the "combined window" outside of `processEarly`. Maybe you can just add a quick 
mention of what it is and that it's for early records

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 }
-
 //create right window for previous record
 if (latestLeftTypeWindow != null) {
-final long leftWindowEnd = 
latestLeftTypeWindow.key.window().end();
-final long rightWinStart = leftWindowEnd == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
leftWindowEnd + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
+final long previousRecord = 
latestLeftTypeWindow.key.window().end();
+final long rightWinStart = previousRecord == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
previousRecord + 1;

Review comment:
   I'm having trouble wrapping my head around this line. Why would we 
create a right window at `latestLeftTypeWindow.maxTimestamp + 1` if the 
previous record was at `timeDifferenceMs`? Wouldn't we have created the right 
window for whatever is at `latestLeftTypeWindow.maxTimestamp + 1` when we 
processed the `previousRecord`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
 
-if (endTime == windows.timeDifferenceMs()) {
+if (startTime == 0) {
 combinedWindow = next;
-} else if (endTime > timestamp && startTime <= timestamp) {
+} else if (endTime >= timestamp && startTime <= timestamp) 
{
 rightWinAgg = next.value;
 

[jira] [Commented] (KAFKA-10431) ProducerPerformance with payloadFile arg: add support for sequential or random outputs

2020-08-26 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185531#comment-17185531
 ] 

huxihx commented on KAFKA-10431:


Seems adding a new option to ProducerPerformance requires a KIP. I am thinking 
of replacing the randomly-choosing with a sequential consuming. Is it a better 
alternative?

> ProducerPerformance with payloadFile arg: add support for sequential or 
> random outputs
> --
>
> Key: KAFKA-10431
> URL: https://issues.apache.org/jira/browse/KAFKA-10431
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 2.5.1
>Reporter: Zaahir Laher
>Priority: Minor
>
> When using ProducerPerformance  with the --payloadFile argument with a file 
> with multiple payloads (i.e the default is one payload per line) , the 
> ProducerPerformance randomly chooses payloads from the file. 
> This could result in the same payload being sent, which may not be the 
> desired result in some cases. 
> It would be useful to all have another argument that allows for sequence 
> payload submission if required. If left blank this arg would default to false 
> (i.e default random selection).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] huxihx commented on pull request #9218: MINOR: Fix shouldNotResetEpochHistoryHeadIfUndefinedPassed

2020-08-26 Thread GitBox


huxihx commented on pull request #9218:
URL: https://github.com/apache/kafka/pull/9218#issuecomment-681198703


   @ijuma Please review this patch. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-08-26 Thread GitBox


ableegoldman commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r477663328



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {
-state = MemberState.UNJOINED;

Review comment:
   Did you mean to say `recordRebalanceFailure` or is this comment just out 
of date after the latest changes?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
 case CompletingRebalance =>
-responseCallback(Errors.REBALANCE_IN_PROGRESS)
+  // consumers may start sending heartbeat after join-group 
response, in which case
+  // we should treat them as normal hb request and reset the timer
+  val member = group.get(memberId)

Review comment:
   Wait, so before this the coordinator wouldn't complete the current 
heartbeat? Doesn't that mean that heartbeating is pointless until the rebalance 
completes? Obviously that doesn't line up with my observations since members 
were clearly getting kicked from the group before the rebalance had completed, 
so I must be missing something here

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() {
 joinFuture.addListener(new RequestFutureListener() {
 @Override
 public void onSuccess(ByteBuffer value) {
-// handle join completion in the callback so that the 
callback will be invoked

Review comment:
   I assume you mean the JoinGroup response handler  

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
 case CompletingRebalance =>
-responseCallback(Errors.REBALANCE_IN_PROGRESS)

Review comment:
   Honestly, if we have to keep handling the `REBALANCE_IN_PROGRESS` error 
on the client side anyway, then maybe it's best to keep things simple and just 
continue to send this in the response. Otherwise it just seems like asking for 
trouble if we have to consider different possible responses depending on which 
version of the broker the client is talking to.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) {
 resetJoinGroupFuture();
 needsJoinPrepare = true;
 } else {
-log.info("Generation data was cleared by heartbeat thread. 
Initiating rejoin.");
+log.info("Generation data was cleared by heartbeat thread 
to {} and state is now {} before " +
+ "the rebalance callback is triggered, marking this 
rebalance as failed and retry",
+ generation, state);
 resetStateAndRejoin();
 resetJoinGroupFuture();
-return false;
 }
 } else {
 final RuntimeException exception = future.exception();
-log.info("Join group failed with {}", exception.toString());
+log.info("Rebalance failed with {}", exception.toString());

Review comment:
   Can we still specify that we failed during the JoinGroup? eg `Rebalance 
failed on JoinGroup with {}` or something





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9222: KAFKA-10437: Implement test-utils and StateStore changes for KIP-478

2020-08-26 Thread GitBox


guozhangwang commented on a change in pull request #9222:
URL: https://github.com/apache/kafka/pull/9222#discussion_r477676262



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * Processor context interface.
+ */
+public interface StateStoreContext {
+
+/**
+ * Returns the application id.
+ *
+ * @return the application id
+ */
+String applicationId();

Review comment:
   Are appId / taskId / metrics / appConfigs only needed for state store 
context? Is it possible that they would also be used within Processor?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthompson6 commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-08-26 Thread GitBox


jthompson6 commented on pull request #9216:
URL: https://github.com/apache/kafka/pull/9216#issuecomment-681165609


   I see the header converter class is configurable in 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L90,
 but it appears to be a whole cluster setting, when really we want to only set 
it for this connector. 
   
   Do you think it is a bug that SimpleHeaderConvertor#fromConnectHeader 
transforms a byte[] header, even is the schema is BYTES? Maybe that is a 
question for @rhauch ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10410) OnRestoreStart disappeared from StateRestoreCallback in 2.6.0 and reappeared in a useless place

2020-08-26 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185511#comment-17185511
 ] 

Sophie Blee-Goldman commented on KAFKA-10410:
-

It's hard to say without knowing specifically what happens in your 
StateRestoreCallback, but handling transactions with the StateRestoreListener  
sounds potentially unsafe: even before 2.6, `onRestoreEnd` was never guaranteed 
to be called when a store stopped restoring, only when the restoration actually 
completed (onRestoreComplete would have been a better name). So it was always 
possible for onRestoreStart to be called without ever calling onRestoreEnd, or 
for onRestoreStart to be called multiple times in a row, etc

That said, can't you just register the callback of each store+partition to your 
global restore listener, then use the handle on the callback to do whatever 
cleanup you needed for that particular store when onRestoreEnd is called for 
it? It's certainly a bit roundabout, but definitely still possible

> OnRestoreStart disappeared from StateRestoreCallback  in 2.6.0 and reappeared 
> in a useless place
> 
>
> Key: KAFKA-10410
> URL: https://issues.apache.org/jira/browse/KAFKA-10410
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Mark Shelton
>Priority: Blocker
>
> In version 2.5.0 and earlier there are "onRestoreStart" and "onRestoreEnd" 
> methods on StateRestoreCallback.
> Version 2.6.0 removed these calls and put them into StateRestoreListener and 
> requires "streaming.setGlobalStateRestoreListener".
> This makes it impossible for the actual StateRestoreCallback implementation 
> to receive the start and end indication and is blocking me from moving to 
> 2.6.0.
> See:
> [https://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.html]
>  
> Related JIRA:
> https://issues.apache.org/jira/browse/KAFKA-4322 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-08-26 Thread GitBox


mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r477622476



##
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##
@@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() {
 
 assertThat(
 topology.stateStores().size(),
-equalTo(1));
+equalTo(2));

Review comment:
   @guozhangwang Atm, when `enableSendingOldValues` is set and the upstream 
store is not materialized already we enforce a materialization. Thus, enabling 
`enableSendingOldValues` in `KTable#filter()` would be a breaking change as we 
would start to materialize state that did not exist before if one upgrades a 
topology.
   
   Instead, we want to say, _iff_ the upstream store exist, please send me the 
old value, but if the upstream store does not exist, it's ok to just send 
`old=null` but don't force a materialization.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-08-26 Thread GitBox


mjsax commented on pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#issuecomment-681153031


   Compile error:
   ```
   
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:1989:
 error: constructor StreamsMetricsImpl in class StreamsMetricsImpl cannot be 
applied to given types;
   20:24:26 final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST);
   20:24:26^
   20:24:26   required: Metrics,String,String,Time
   20:24:26   found: Metrics,String,String
   20:24:26   reason: actual and formal argument lists differ in length
   20:24:27 1 error
   ```
   
   Seems you need to rebase?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-08-26 Thread GitBox


mjsax commented on a change in pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#discussion_r477620408



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamUtil.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
+/**
+ * Shared functions to handle verifications of a valid {@link 
org.apache.kafka.streams.kstream.KStream}.
+ */
+final class KStreamUtil {
+
+private KStreamUtil() {}
+
+/**
+ * @throws IllegalArgumentException if the same transformer instance is 
obtained each time
+ */
+static void checkSupplier(final TransformerSupplier supplier) {

Review comment:
   I am wondering if we actually need three methods? Could we use 
`java.util.function.Supplier` instead (we don't really care about generic types.
   
   To customize the error message we just pass an additional `String` or use 
`supplier.getClass().getName()` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #9194: KAFKA-10384: Separate converters from generated messages

2020-08-26 Thread GitBox


cmccabe merged pull request #9194:
URL: https://github.com/apache/kafka/pull/9194


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-26 Thread GitBox


mjsax commented on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-681148623


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-26 Thread GitBox


mjsax commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r477616241



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   Interesting... Thanks for clarification.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-26 Thread GitBox


mjsax commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r477606207



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   Ah. Good point. We can leave as-is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

2020-08-26 Thread GitBox


mjsax commented on pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#issuecomment-681138143


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10395:

Fix Version/s: 2.6.1

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
> Fix For: 2.7.0, 2.6.1
>
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

2020-08-26 Thread GitBox


mjsax commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-681137931


   Cherry-picked to `2.6` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9146: KAFKA-10316 Updated Kafka Streams upgrade-guide.html

2020-08-26 Thread GitBox


mjsax commented on pull request #9146:
URL: https://github.com/apache/kafka/pull/9146#issuecomment-681131093


   My pleasure :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-08-26 Thread Gokul Srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185482#comment-17185482
 ] 

Gokul Srinivas commented on KAFKA-10186:


[~mjsax]- I've cleaned up the KIP. Please let me know if it looks sane enough 
so that I can start a discussion on the mailing list.

 

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Assignee: Gokul Srinivas
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10410) OnRestoreStart disappeared from StateRestoreCallback in 2.6.0 and reappeared in a useless place

2020-08-26 Thread Mark Shelton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185481#comment-17185481
 ] 

Mark Shelton commented on KAFKA-10410:
--

My implementation of StateRestoreCallback needs to know when the state restore 
starts and when it ends. This is because it needs to handle statistics, 
transaction stuff and cleanup. In version 2.5 and earlier this was easy as the 
methods were already there on {{AbstractNotifyingRestoreCallback.}}

With version 2.6 only the "StateRestoreListener", which is per streams 
instance, receives the "onRestartStart" and "onRestartEnd". Since the 
"StateRestoreListener" is per streams instance there is no easy way for it to 
notify any StateRestoreCallback instance(s).

In version 2.5 and earlier the following was available and convenient. But 
since {{AbstractNotifyingRestoreCallback}} is removed the update to version 2.6 
is a show stopper.

{{public class MyStateRestoreCallback extends AbstractNotifyingRestoreCallback 
{}}
{{}}

{{  @Override}}
{{   public void onRestoreStart(TopicPartition topicPartition, String 
storeName, long startingOffset, long endingOffset) {}}

{{    // called by Kafka Streams}}
{{  }}}

{{  @Override}}
{{   public void onRestoreEnd(TopicPartition topicPartition, String storeName, 
long totalRestored) {}}
{{    // called by Kafka Streams}}{{}}

{{  }}}

{{...}}

{{}}}

 

> OnRestoreStart disappeared from StateRestoreCallback  in 2.6.0 and reappeared 
> in a useless place
> 
>
> Key: KAFKA-10410
> URL: https://issues.apache.org/jira/browse/KAFKA-10410
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Mark Shelton
>Priority: Blocker
>
> In version 2.5.0 and earlier there are "onRestoreStart" and "onRestoreEnd" 
> methods on StateRestoreCallback.
> Version 2.6.0 removed these calls and put them into StateRestoreListener and 
> requires "streaming.setGlobalStateRestoreListener".
> This makes it impossible for the actual StateRestoreCallback implementation 
> to receive the start and end indication and is blocking me from moving to 
> 2.6.0.
> See:
> [https://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.html]
>  
> Related JIRA:
> https://issues.apache.org/jira/browse/KAFKA-4322 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-26 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-681109748


   @chia7712 : Thanks for the reply. I like your overall idea and I think it 
can be used to solve the problem completely in a simpler way.
   
   1. Instead of at `Partition`, we collect all pending delayed check 
operations in a queue in ReplicaManager. All callers to 
ReplicaManager.appendRecords() are expected to take up to 1 item from that 
queue and check the completeness for all affected partitions, without holding 
any conflicting locks.
   
   2. Most callers to ReplicaManager.appendRecords() are from KafkaApis. We can 
just add the logic to check the ReplicaManager queue at the end of 
KafkaApis.handle(), at which point, no conflicting locks will be held.
   
   3. Another potentially caller is the expiration thread in a purgatory. 
SystemTimer always runs the expiration logic in a separate thread and 
DelayedOperation.onExpiration() is always called without holding any 
conflicting lock. So, for those delayed operations using 
ReplicaManager.appendRecords(), we can pass down a flag to DelayedOperation so 
that at the end of onExpiration, we check the ReplicaManager queue if the flag 
is set.
   
   4. We keep `DelayedJoin` as it is, still passing in the group lock to 
DelayedOperation to avoid deadlocks due to two levels of locking.
   
   5. We can still get rid of the `tryLock` logic in DelayedOperation for 
simplification since there is no opportunity for deadlock.
   
   What do you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-08-26 Thread GitBox


ryannedolan commented on pull request #9216:
URL: https://github.com/apache/kafka/pull/9216#issuecomment-681062141


   Thanks for finding this. Would it make sense to implement a new nop 
converter for this purpose? Not sure whether a KIP would be required -- maybe 
it can be private to the mirror package?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthompson6 commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-08-26 Thread GitBox


jthompson6 commented on pull request #9216:
URL: https://github.com/apache/kafka/pull/9216#issuecomment-681056654


   @ryannedolan are you the right person to tag here?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-08-26 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r477485916



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -128,10 +136,23 @@ public void setup() throws InterruptedException {
 backup.kafka().createTopic("primary.test-topic-1", 1);
 backup.kafka().createTopic("heartbeats", 1);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
-backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-2-" + i);
-}
+// produce to all partitions but the last one

Review comment:
   Good point. I'll look into this. Besides not affecting the other tests, 
it should make it simpler to reason about





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-08-26 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r477485106



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -367,14 +406,37 @@ public void testOneWayReplicationWithAutorOffsetSync1() 
throws InterruptedExcept
 time.sleep(5000);
 
 // create a consumer at backup cluster with same consumer group Id to 
consume old and new topic
-consumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-"group.id", "consumer-group-1"), "primary.test-topic-1", 
"primary.test-topic-2");
+consumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, 
"primary.test-topic-1", "primary.test-topic-2");
 
 records = consumer.poll(Duration.ofMillis(500));
 // similar reasoning as above, no more records to consume by the same 
consumer group at backup cluster
 assertEquals("consumer record size is not zero", 0, records.count());
 consumer.close();
+}
+
+private void produceMessages(EmbeddedConnectCluster cluster, String 
topicName, int partitions, String msgPrefix) {
+for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+// produce to all partitions but the last one

Review comment:
   Good catch. Updating.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9222: KAFKA-10437: Implement test-utils and StateStore changes for KIP-478

2020-08-26 Thread GitBox


vvcephei commented on a change in pull request #9222:
URL: https://github.com/apache/kafka/pull/9222#discussion_r477477190



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
##
@@ -36,19 +36,19 @@
 public class WordCountProcessorTest {
 @Test
 public void test() {
-final MockProcessorContext context = new MockProcessorContext();
+final MockProcessorContext context = new 
MockProcessorContext<>();
 
 // Create, initialize, and register the state store.
 final KeyValueStore store =
 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("Counts"), 
Serdes.String(), Serdes.Integer())
 .withLoggingDisabled() // Changelog is not supported by 
MockProcessorContext.
 // Caching is disabled by default, but FYI: caching is also 
not supported by MockProcessorContext.
 .build();
-store.init(context, store);
+store.init(context.getStateStoreContext(), store);

Review comment:
   Here's where we're switching contexts to the StateStoreContext to invoke 
the new API.

##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for 
users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * 
+ * The tests for this class 
(org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * 
+ * Note that this class does not take any automated actions (such as firing 
scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link 
Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext implements 
ProcessorContext, RecordCollector.Supplier {
+// Immutable fields 
+private final 

[GitHub] [kafka] mjsax merged pull request #9217: MINOR: fix JavaDoc

2020-08-26 Thread GitBox


mjsax merged pull request #9217:
URL: https://github.com/apache/kafka/pull/9217


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10410) OnRestoreStart disappeared from StateRestoreCallback in 2.6.0 and reappeared in a useless place

2020-08-26 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185367#comment-17185367
 ] 

Guozhang Wang commented on KAFKA-10410:
---

Not sure if I fully understand what do you mean by "GlobalStateRestoreListener 
have a way of obtaining StateRestoreCallback", but just to clarify, the 
"StateRestoreListener" is used for notifying when a restoration is started / 
ended etc, and the "StateRestoreCallback" is for the actual implementation of 
applying the changelog records to the state stores, and they should naturally 
be in two separate implementations:

1) You specify the per-store "StateRestoreCallback" when you register a store.
2) You specify the global "StateRestoreListener" when you instantiate a streams 
instance.

We do not recommend having just one impl class that extends both interfaces 
moving forward.

If you can share your current code maybe that can help me better understanding 
your pattern.

> OnRestoreStart disappeared from StateRestoreCallback  in 2.6.0 and reappeared 
> in a useless place
> 
>
> Key: KAFKA-10410
> URL: https://issues.apache.org/jira/browse/KAFKA-10410
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Mark Shelton
>Priority: Blocker
>
> In version 2.5.0 and earlier there are "onRestoreStart" and "onRestoreEnd" 
> methods on StateRestoreCallback.
> Version 2.6.0 removed these calls and put them into StateRestoreListener and 
> requires "streaming.setGlobalStateRestoreListener".
> This makes it impossible for the actual StateRestoreCallback implementation 
> to receive the start and end indication and is blocking me from moving to 
> 2.6.0.
> See:
> [https://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.html]
>  
> Related JIRA:
> https://issues.apache.org/jira/browse/KAFKA-4322 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc opened a new pull request #9224: improve MM2 unit tests

2020-08-26 Thread GitBox


ning2008wisc opened a new pull request #9224:
URL: https://github.com/apache/kafka/pull/9224


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon

2020-08-26 Thread GitBox


bbejeck commented on pull request #9207:
URL: https://github.com/apache/kafka/pull/9207#issuecomment-681005444


   Tests seem to be hung, retesting
   
   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-26 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185325#comment-17185325
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-10434:
--

Also kind of weird to have Instant-based methods on WindowStore and only 
long-based methods in SessionStore. Regardless of un-deprecating long-based 
methods or not, we can consider aliging both interfaces.

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-26 Thread GitBox


jeqo commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r477436882



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##
@@ -416,26 +552,43 @@ private long currentSegmentLastTime() {
 }
 
 private void getNextSegmentIterator() {
-++currentSegmentId;
-lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+if (forward) {
+++currentSegmentId;
+lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
 
-if (currentSegmentId > lastSegmentId) {
-current = null;
-return;
-}
+if (currentSegmentId > lastSegmentId) {
+current = null;
+return;
+}
 
-setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
+setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
-current.close();
-current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+current.close();
+
+current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+} else {
+--currentSegmentId;
+//lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));

Review comment:
   actually it could be removed: `lastSegmentId` should be stable going 
backwards based on `timeFrom`.
   
   On the `if` branch, I assume that `lastSegmentId` could change in between 
iterations if `maxObservedTimestamp` is updated right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-26 Thread GitBox


vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r477424578



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -396,34 +398,65 @@ private String cacheSensorPrefix(final String threadId, 
final String taskId, fin
 + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + 
SENSOR_PREFIX_DELIMITER + cacheName;
 }
 
-public final Sensor storeLevelSensor(final String threadId,
- final String taskId,
+public final Sensor storeLevelSensor(final String taskId,
  final String storeName,
  final String sensorName,
- final Sensor.RecordingLevel 
recordingLevel,
+ final RecordingLevel recordingLevel,
  final Sensor... parents) {
-final String key = storeSensorPrefix(threadId, taskId, storeName);
-synchronized (storeLevelSensors) {
-final String fullSensorName = key + SENSOR_NAME_DELIMITER + 
sensorName;
-final Sensor sensor = metrics.getSensor(fullSensorName);
-if (sensor == null) {
+final String key = storeSensorPrefix(Thread.currentThread().getName(), 
taskId, storeName);
+final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
+return Optional.ofNullable(metrics.getSensor(fullSensorName))
+.orElseGet(() -> {
 storeLevelSensors.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(fullSensorName);
 return metrics.sensor(fullSensorName, recordingLevel, parents);
-} else {
-return sensor;
-}
+});

Review comment:
   I'm still mildly concerned about walking back the synchronization here, 
but I can't think of a realistic scenario in which we'd get a concurrency bug. 
Then again, the whole point of defaulting to less granular concurrency controls 
is that it's hard to imagine all the possible scenarios.
   
   In this case, it really doesn't seem like there's a good reason to go for 
super granular concurrency control. Did we spend a lot of time blocked 
registering sensors before?
   
   Actually, one condition comes to mind: LinkedList is not threadsafe, and 
accessing the ConcurrentHashMap value is only either a CAS or volatile read, so 
it doesn't create a memory barrier as `synchronized` does. Therefore, different 
threads will only be looking at their own locally cached list for each value in 
the map, although they'll all agree on the set of keys in the map.
   
   If you want to push the current implementation style, then you should use a 
ConcurrentLinkedDeque instead of LinkedList, but I'd really prefer to see the 
`synchronized` blocks come back unless/until there's a compelling performance 
reason to drop them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10439) Connect's Values class loses precision for integers, larger than 64 bits

2020-08-26 Thread Oleksandr Diachenko (Jira)
Oleksandr Diachenko created KAFKA-10439:
---

 Summary: Connect's Values class loses precision for integers, 
larger than 64 bits
 Key: KAFKA-10439
 URL: https://issues.apache.org/jira/browse/KAFKA-10439
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Oleksandr Diachenko
Assignee: Oleksandr Diachenko
 Fix For: 2.7.0


The `org.apache.kafka.connect.data.Values#parse` method parses integers, which 
are larger than `Long.MAX_VALUE` as `double` with `

Schema.FLOAT64_SCHEMA`.

 

That means it loses precision for these larger integers.

For example:
{code:java}
SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");
{code}
returns:
{code:java}
SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #7561: [WIP] KAFKA-7739: Tiered storage

2020-08-26 Thread GitBox


satishd commented on a change in pull request #7561:
URL: https://github.com/apache/kafka/pull/7561#discussion_r477403016



##
File path: core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
##
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import java.io.{File, InputStream}
+import java.nio.file.{Files, Path}
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.{Consumer, Function}
+
+import kafka.log.{CleanableIndex, Log, OffsetIndex, OffsetPosition, TimeIndex, 
TransactionIndex, TxnIndexSearchResult}
+import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.common.errors.CorruptRecordException
+import org.apache.kafka.common.log.remote.storage.{RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.common.utils.{KafkaThread, Utils}
+
+object RemoteIndexCache {
+  val DirName = "remote-log-index-cache"

Review comment:
   This is resolved with the latest commit. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-26 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r477380346



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean rightWinAlreadyCreated = false;
 
 // keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+KeyValue, ValueAndTimestamp> latestLeftTypeWindow 
= null;
 try (
 final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(

Review comment:
   Ah yeah the different types of stores is a good point, I haven't tested 
with all of them. I changed it to do 0 or the positive fetch start





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-26 Thread GitBox


vvcephei commented on a change in pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#discussion_r477358755



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
##
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. 
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.serialization
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.UUID
+
+import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, 
Serdes => JSerdes}
+import org.apache.kafka.streams.kstream.WindowedSerdes
+
+object Serdes extends LowPrioritySerdes {
+  implicit def stringSerde: Serde[String] = JSerdes.String()
+  implicit def longSerde: Serde[Long] = 
JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit def javaLongSerde: Serde[java.lang.Long] = JSerdes.Long()
+  implicit def byteArraySerde: Serde[Array[Byte]] = JSerdes.ByteArray()
+  implicit def bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer()
+  implicit def shortSerde: Serde[Short] = 
JSerdes.Short().asInstanceOf[Serde[Short]]
+  implicit def javaShortSerde: Serde[java.lang.Short] = JSerdes.Short()
+  implicit def floatSerde: Serde[Float] = 
JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit def javaFloatSerde: Serde[java.lang.Float] = JSerdes.Float()
+  implicit def doubleSerde: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit def javaDoubleSerde: Serde[java.lang.Double] = JSerdes.Double()
+  implicit def intSerde: Serde[Int] = 
JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit def javaIntegerSerde: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()
+
+  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
+new WindowedSerdes.TimeWindowedSerde[T](tSerde)

Review comment:
   Ooh, this one might become a problem.
   
   See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
   
   The fact that we're deprecating this constructor isn't what concerns me, 
it's the reason why: if you were to implicitly use this serde to _read_ 
records, you'd get incorrect results. This constructor fills in the end of the 
window as `Long.MAX_VALUE`, which will be wrong approximately all the time.
   
   Further, when we pass serdes like this, they have to be pre-configured. 
Streams won't call `configure`. We could plan to require an `implicit 
streamsConfig: Properties` argument and then extend KIP-659 to configure this 
serde using the new `window.size.ms` config. But that might not be the right 
window size in practice either, and the fact that it's an implicit makes me 
concerned that it would be really hard to track down the bug in user code.
   
   I'm sort of thinking maybe we should just drop this one implicit, since you 
really need to specify the window size for it to work properly.
   
   Note, the _serializer_ is safe, it's only the _deserializer_ that we need to 
worry about. So, my other idea would be for the implicit to pack up a new serde 
with the `new TimeWindowedSerializer(inner)` and some kind of `new 
ThrowUnsupportedOperationExceptionDeserializer()` that prevents the implicit 
from being used inappropriately. The exception message could say "This implicit 
serde is only for serializing keys. To deserialize as well, you should directly 
provide a WindowedSerdes.TimeWindowedSerde instance."
   
   What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9179: KAFKA-10390: Remove ignore case option when grep process info to be more specific

2020-08-26 Thread GitBox


lbradstreet commented on a change in pull request #9179:
URL: https://github.com/apache/kafka/pull/9179#discussion_r477352942



##
File path: bin/kafka-server-stop.sh
##
@@ -21,7 +21,7 @@ if [[ $(uname -s) == "OS/390" ]]; then
 fi
 PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v 
grep | awk '{print $1}')
 else
-PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
+PIDS=$(ps ax | grep 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')

Review comment:
   Is there any reason we can't make it more specific by requiring an exact 
match with surrounding spaces: `' kafka\.Kafka '`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-26 Thread GitBox


vvcephei commented on pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#issuecomment-680917350


   Thanks, @LMnet ! 
   
   The commit policy is just that the committer will squash the whole PR into 
one commit when merging, so you don't have to worry about it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9179: KAFKA-10390: Remove ignore case option when grep process info to be more specific

2020-08-26 Thread GitBox


showuon commented on pull request #9179:
URL: https://github.com/apache/kafka/pull/9179#issuecomment-680858304


   @cmccabe @lbradstreet , could you please review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

2020-08-26 Thread GitBox


showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-680857804


   @mingaliu @hachikuji @omkreddy , could you please review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-08-26 Thread GitBox


showuon commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-680857267


   @kkonstantine , could you please review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-08-26 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-680856775


   @abbccdda @omkreddy @feyman2016 @huxihx , could you help review this PR? 
Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat edited a comment on pull request #9146: KAFKA-10316 Updated Kafka Streams upgrade-guide.html

2020-08-26 Thread GitBox


johnthotekat edited a comment on pull request #9146:
URL: https://github.com/apache/kafka/pull/9146#issuecomment-680817137


   Thanks to you for guiding me through!! And it was so nice to see you on the 
Kafka summit :)  @mjsax



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat commented on pull request #9146: KAFKA-10316 Updated Kafka Streams upgrade-guide.html

2020-08-26 Thread GitBox


johnthotekat commented on pull request #9146:
URL: https://github.com/apache/kafka/pull/9146#issuecomment-680817137


   Thanks to you for guiding me through!! And it was so nice to see you on the 
Kafka summit :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch

2020-08-26 Thread GitBox


viktorsomogyi commented on pull request #9150:
URL: https://github.com/apache/kafka/pull/9150#issuecomment-680800344


   @dajac pinging you as well, would you please review this and the 2.3 and 2.4 
backports? :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-08-26 Thread GitBox


chia7712 opened a new pull request #9223:
URL: https://github.com/apache/kafka/pull/9223


   issue: https://issues.apache.org/jira/browse/KAFKA-10438
   
   There is no checks for header key so instantiating key (bytes to string) is 
unnecessary. The risk of this PR is the exception of converting byte[] to 
string can't be discovered quickly (the conversion error is rare so it should 
be fine).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10438) Lazy initialization of record header to reduce memory usage in validating records

2020-08-26 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10438:
--

 Summary: Lazy initialization of record header to reduce memory 
usage in validating records
 Key: KAFKA-10438
 URL: https://issues.apache.org/jira/browse/KAFKA-10438
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


{code}
  private def validateRecord(batch: RecordBatch, topicPartition: 
TopicPartition, record: Record, batchIndex: Int, now: Long,
 timestampType: TimestampType, timestampDiffMaxMs: 
Long, compactedTopic: Boolean,
 brokerTopicStats: BrokerTopicStats): 
Option[ApiRecordError] = {
if (!record.hasMagic(batch.magic)) {
  brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
  return Some(ApiRecordError(Errors.INVALID_RECORD, new 
RecordError(batchIndex,
s"Record $record's magic does not match outer magic ${batch.magic} in 
topic partition $topicPartition.")))
}

// verify the record-level CRC only if this is one of the deep entries of a 
compressed message
// set for magic v0 and v1. For non-compressed messages, there is no inner 
record for magic v0 and v1,
// so we depend on the batch-level CRC check in 
Log.analyzeAndValidateRecords(). For magic v2 and above,
// there is no record-level CRC to check.
if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) {
  try {
record.ensureValid()
  } catch {
case e: InvalidRecordException =>
  brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
  throw new CorruptRecordException(e.getMessage + s" in topic partition 
$topicPartition.")
  }
}

validateKey(record, batchIndex, topicPartition, compactedTopic, 
brokerTopicStats).orElse {
  validateTimestamp(batch, record, batchIndex, now, timestampType, 
timestampDiffMaxMs)
}
  }
{code}

There is no checks for header key so instantiating key (bytes to string) is 
unnecessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-08-26 Thread Jerry Wei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17185062#comment-17185062
 ] 

Jerry Wei commented on KAFKA-10134:
---

[~guozhang] cool, I'll try it later, thanks

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.5.2, 2.6.1
>
> Attachments: consumer3.log.2020-08-20.log, 
> consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9444) Cannot connect to zookeeper after updating the kafka config

2020-08-26 Thread Rishabh Bohra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rishabh Bohra resolved KAFKA-9444.
--
Resolution: Invalid

Please refer to [KAFKA-7335] - Store clusterId locally to ensure broker joins 
the right cluster
for more info.

> Cannot connect to zookeeper after updating the kafka config
> ---
>
> Key: KAFKA-9444
> URL: https://issues.apache.org/jira/browse/KAFKA-9444
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Rishabh Bohra
>Priority: Major
>  Labels: mesosphere
>
> h4. *Issue:*
> While connecting the kafka with zookeeper at a custom path, in the broker-0, 
> this error message pops up-
> {{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
> (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The 
> Cluster ID 2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
> Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
> the wrong cluster. Configured zookeeper.connect may be wrong. at 
> kafka.server.KafkaServer.startup(KafkaServer.scala:220) at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
> kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)}}
> *Steps:*
> {{1. Start the kafka cluster without any zookeeper path/url}}
>  {{2. Update the kafka configuration with the provided path for zookeeper.}}
>  {{3. Check broker logs}}
> *Package Version:*
> Zookeeper: 3.4.14
>  Zookeeper Client - Zookeeper-3.4.14.jar
>  Kafka - 2.12-2.4.0
> Similar issue can be found 
> [here|https://stackoverflow.com/questions/59592518/kafka-broker-doesnt-find-cluster-id-and-creates-new-one-after-docker-restart]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-08-26 Thread GitBox


chia7712 commented on pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#issuecomment-680715966


   > Do you have any data on the improvement this achieves?
   
   @ijuma the JMH result is attached. please take a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org