[jira] [Created] (KAFKA-14289) Use non zero status code in Kafka CLI when the command failed

2022-10-11 Thread Nicolas Guyomar (Jira)
Nicolas Guyomar created KAFKA-14289:
---

 Summary: Use non zero status code in Kafka CLI when the command 
failed
 Key: KAFKA-14289
 URL: https://issues.apache.org/jira/browse/KAFKA-14289
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Nicolas Guyomar


Hi team,

Using the  kafka-consumer-groups CLI as an example,  running the 
--reset-offsets option on an active consumer group results in an ERROR in the 
stdout 

_Error: Assignments can only be reset if the group 'console-consumer-12543' is 
inactive, but the current state is Stable._

but the status code is 0, while it would be nice, for any automation work, to 
use status code 1

Was that a design decision to consider the command execution to be a success 
(it technically is because the command ran without bug) and return 0, and not 
interpret the Kafka response to map it to the CLI output status code ? 

Thank you

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-13617) Enhance delete-expired-group-metadata logging

2022-01-25 Thread Nicolas Guyomar (Jira)
Nicolas Guyomar created KAFKA-13617:
---

 Summary: Enhance delete-expired-group-metadata logging
 Key: KAFKA-13617
 URL: https://issues.apache.org/jira/browse/KAFKA-13617
 Project: Kafka
  Issue Type: Improvement
  Components: offset manager
Reporter: Nicolas Guyomar


Hi team,

When you try to understand why a consumer group offset was expired, the default 
INFO logging on the group coordinator does not give much : 

info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - 
currentTimestamp} milliseconds.")

[https://github.com/apache/kafka/blob/22d056c9b76c9bf8417d8701594d1fcee1c6a655/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L834]

 

Would it be possible to enhance this log with the actual group/topic/partition 
that was removed please ? 

Thank you



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13408) Add a new metric to track invalid task provided offset

2021-10-27 Thread Nicolas Guyomar (Jira)
Nicolas Guyomar created KAFKA-13408:
---

 Summary: Add a new metric to track invalid task provided offset
 Key: KAFKA-13408
 URL: https://issues.apache.org/jira/browse/KAFKA-13408
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Nicolas Guyomar


Hi team,

Whenever a task provide invalid offset back to the framework, we log a warn 
message but we do not track yet a dedicated metric for those

In some situations where the Sink connector task implements the precommit 
method and has a bug in the way it tracks assigned topic/partition this could 
lead to a task not reporting offset at all thus no offset is being tracked on 
the consumer group in Kafka and can go undetected if we rely on the existing 
JMX beans for offset commit

This improvement Jira is to add a dedicated JMX Metric to be able to alert if 
such invalid offset are provided too often or for too long

 

Thank you



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12805) Aborted send could have a different exception than DisconnectException

2021-05-18 Thread Nicolas Guyomar (Jira)
Nicolas Guyomar created KAFKA-12805:
---

 Summary: Aborted send could have a different exception than 
DisconnectException
 Key: KAFKA-12805
 URL: https://issues.apache.org/jira/browse/KAFKA-12805
 Project: Kafka
  Issue Type: Wish
  Components: network
Reporter: Nicolas Guyomar


Tight now we are treating timeout in the network client as a disconnection 
exception, which "hides" legit timeout where increasing 
{{[request.timeout.ms|http://request.timeout.ms/]}} could be considered OK 
when there is no "real" network disconnection :
Caused by: org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeConfigs, deadlineMs=1616147081029) timed out at 
1616147081039 after 2 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled 
describeConfigs request with correlation id 8 due to node 1 being disconnected
 

the DisconnectException is thrown because of the disconnect flag being set to 
true in 
[https://github.com/apache/kafka/blob/3d0b4d910b681df7d873c8a0285eaca01d6c173a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L352]

While we _could_ have a different path from there 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L793]
 that would propagate the fact that the connection timed out because of 
{{[request.timeout.ms|http://request.timeout.ms/]}} expiration, and adjust the 
later thrown exception   in there 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1195]
  so that it's not a {{DisconnectException}} ?

 

Thank you

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12642) Improve Rebalance reason upon metadata change

2021-04-09 Thread Nicolas Guyomar (Jira)
Nicolas Guyomar created KAFKA-12642:
---

 Summary: Improve Rebalance reason upon metadata change
 Key: KAFKA-12642
 URL: https://issues.apache.org/jira/browse/KAFKA-12642
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Nicolas Guyomar


Whenever the known member metadata does not match anymore the one from a 
JoinGroupRequest, the GroupCoordinator triggers a rebalance with the following 
reason  "Updating metadata for member ${member.memberId}"  but  there 2 
underlying reasons from that part of the code in MemberMetadata.scala : 
{code:java}
def matches(protocols: List[(String, Array[Byte])]): Boolean = {
  if (protocols.size != this.supportedProtocols.size)
return false

  for (i <- protocols.indices) {
val p1 = protocols(i)
val p2 = supportedProtocols(i)
if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
  return false
  }
  true
}{code}
Could we improve the Rebalance Reason with a bit more detail maybe ? 

 

Thank you

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9461) Limit DEBUG statement size when logging failed record value

2020-01-21 Thread Nicolas Guyomar (Jira)
Nicolas Guyomar created KAFKA-9461:
--

 Summary: Limit DEBUG statement size when logging failed record 
value
 Key: KAFKA-9461
 URL: https://issues.apache.org/jira/browse/KAFKA-9461
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.4.0
Reporter: Nicolas Guyomar


Hi,

It is possible with the current implementation that we log a full record 
content at DEBUG level, which can overwhelmed log4j buffer and OOM it : 

That stack trace was due to a 70MB messages refused by a broker

 
{code:java}
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at 
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuffer.append(StringBuffer.java:270)
at 
org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
at org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:276)
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:252)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask$1.onCompletion(WorkerSourceTask.java:330){code}
 

 

Would it make sense to protect Connect directly in the ConnectRecord toString() 
method and set a configurable limit ? 

 

Thank you

 

 

 
[https://github.com/apache/kafka/blob/da4337271ef0b72643c0cf47ae51e69f79cf1901/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L348]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)