[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2018-08-28 Thread Ewen Cheslack-Postava (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595907#comment-16595907
 ] 

Ewen Cheslack-Postava commented on KAFKA-2260:
--

I think it is worth pointing out that this is one *proposed solution* to a (set 
of) challenge(s), but isn't the only possible solution. I'm seeing this 
referenced elsewhere as if it is the only fix, but some might have already been 
addressed by 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging],
 and even if they have not, we should address *current* gaps rather than work 
off of 3 year old issues. Despite this particular improvement not having been 
closed in that time, there was separate progress which changes the nature of 
the problem.

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Ben Kirwin
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7354) Fix IdlePercent and NetworkProcessorAvgIdlePercent metric calculation

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595827#comment-16595827
 ] 

ASF GitHub Bot commented on KAFKA-7354:
---

huxihx opened a new pull request #5584: KAFKA-7354: Fix IdlePercent and 
NetworkProcessorAvgIdlePercent metric
URL: https://github.com/apache/kafka/pull/5584
 
 
   
   
   https://issues.apache.org/jira/browse/KAFKA-7354
   
   Currently, MBean 
`kafka.network:type=Processor,name=IdlePercent,networkProcessor=*` and 
`afka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent` could be 
greater than 1. However, these two values represent a percentage which should 
not exceed 1.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix IdlePercent and NetworkProcessorAvgIdlePercent metric calculation
> -
>
> Key: KAFKA-7354
> URL: https://issues.apache.org/jira/browse/KAFKA-7354
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
>
> Currently, MBean 
> `kafka.network:type=Processor,name=IdlePercent,networkProcessor=*` and 
> `afka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent` could be 
> greater than 1. However, these two values represent a percentage which should 
> not exceed 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7287) Set open ACL permissions for old consumer znode path

2018-08-28 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595702#comment-16595702
 ] 

Jun Rao commented on KAFKA-7287:


[~omkreddy], thanks for the patch. I merged the PR to trunk and 2.0. It doesn't 
apply cleanly to 1.1. Do you think you could submit a separate PR for 1.1? 
Thanks.

> Set open ACL permissions for old consumer znode path
> 
>
> Key: KAFKA-7287
> URL: https://issues.apache.org/jira/browse/KAFKA-7287
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
>
> Old consumer znode path should have open ACL permissions in kerberized 
> environment. This got missed in kafkaZkClient changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7287) Set open ACL permissions for old consumer znode path

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595681#comment-16595681
 ] 

ASF GitHub Bot commented on KAFKA-7287:
---

junrao closed pull request #5503: KAFKA-7287: Set open ACL permissions for old 
consumer znode path
URL: https://github.com/apache/kafka/pull/5503
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zk/ZkData.scala 
b/core/src/main/scala/kafka/zk/ZkData.scala
index f918b616024..760bd67299d 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -429,8 +429,13 @@ object PreferredReplicaElectionZNode {
   }.map(_.toSet).getOrElse(Set.empty)
 }
 
+//old consumer path znode
+object ConsumerPathZNode {
+  def path = "/consumers"
+}
+
 object ConsumerOffset {
-  def path(group: String, topic: String, partition: Integer) = 
s"/consumers/${group}/offsets/${topic}/${partition}"
+  def path(group: String, topic: String, partition: Integer) = 
s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}"
   def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
   def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new 
String(_, UTF_8).toLong)
 }
@@ -721,7 +726,7 @@ object ZkData {
 
   // These are persistent ZK paths that should exist on kafka broker startup.
   val PersistentZkPaths = Seq(
-"/consumers", // old consumer path
+ConsumerPathZNode.path, // old consumer path
 BrokerIdsZNode.path,
 TopicsZNode.path,
 ConfigEntityChangeNotificationZNode.path,
@@ -743,7 +748,8 @@ object ZkData {
   }
 
   def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
-if (isSecure) {
+//Old Consumer path is kept open as different consumers will write under 
this node.
+if (!ConsumerPathZNode.path.equals(path) && isSecure) {
   val acls = new ArrayBuffer[ACL]
   acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
   if (!sensitivePath(path))
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 19fa19dafbc..1cdbe4b2a0e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -19,10 +19,10 @@ package kafka.security.auth
 
 import kafka.admin.ZkSecurityMigrator
 import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.ACL
+import org.apache.zookeeper.data.{ACL, Stat}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -304,4 +304,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness 
with Logging {
 }
 }
   }
+
+  @Test
+  def testConsumerOffsetPathAcls(): Unit = {
+zkClient.makeSurePersistentPathExists(ConsumerPathZNode.path)
+
+val consumerPathAcls = 
zkClient.currentZooKeeper.getACL(ConsumerPathZNode.path, new Stat())
+assertTrue("old consumer znode path acls are not open", 
consumerPathAcls.asScala.forall(TestUtils.isAclUnsecure))
+  }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Set open ACL permissions for old consumer znode path
> 
>
> Key: KAFKA-7287
> URL: https://issues.apache.org/jira/browse/KAFKA-7287
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
>
> Old consumer znode path should have open ACL permissions in kerberized 
> environment. This got missed in kafkaZkClient changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect

2018-08-28 Thread Jeremy Custenborder (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595680#comment-16595680
 ] 

Jeremy Custenborder commented on KAFKA-4107:


I would say no. It should throw a bad request saying the connector is not 
defined. Personally I think we need to have some better hygiene when it comes 
to cleaning up offsets in general. It's confusing that if I delete my connector 
and recreate it with the same name it starts at the old offsets. In general I 
think I should be connect should be better about cleaning up offsets for both 
the source and sink. I would want to be able to stop a connector, restart from 
a specific offset, start the connector all through the API. If I delete a 
connector I would expect all offsets to be removed.

> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Priority: Major
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect

2018-08-28 Thread Randall Hauch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595676#comment-16595676
 ] 

Randall Hauch commented on KAFKA-4107:
--

[~jcustenborder], adding it to the REST API is a very interesting idea. Source 
offsets are specific to the source connector, so wouldn't you need to read the 
latest offset to get the structure and then use a new REST API resource to stop 
the connector, set the offsets, and restart the connector? If the connector was 
not defined, then should the offset reset method still work?

> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Priority: Major
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7353) Connect logs 'this' for anonymous inner classes

2018-08-28 Thread Kevin Lafferty (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595573#comment-16595573
 ] 

Kevin Lafferty commented on KAFKA-7353:
---

An example is this log line in 
o.a.k.connect.runtime.WorkerSourceTask.sendRecords():

{{log.error("{} Task failed initialization and will not be started.", this, 
t);}}

'this' resolves to something like 
'org.apache.kafka.connect.runtime.WorkerSourceTask$1@75dbeed9' instead of the 
overridden toString method on WorkerSourceTask.

> Connect logs 'this' for anonymous inner classes
> ---
>
> Key: KAFKA-7353
> URL: https://issues.apache.org/jira/browse/KAFKA-7353
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.2, 1.1.1, 2.0.0
>Reporter: Kevin Lafferty
>Priority: Minor
>
> Some classes in the Kafka Connect runtime create anonymous inner classes that 
> log 'this', resulting in log messages that can't be correlated with any other 
> messages. These should scope 'this' to the outer class to have consistent log 
> messages.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7353) Connect logs 'this' for anonymous inner classes

2018-08-28 Thread Kevin Lafferty (JIRA)
Kevin Lafferty created KAFKA-7353:
-

 Summary: Connect logs 'this' for anonymous inner classes
 Key: KAFKA-7353
 URL: https://issues.apache.org/jira/browse/KAFKA-7353
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0, 1.1.1, 1.0.2
Reporter: Kevin Lafferty


Some classes in the Kafka Connect runtime create anonymous inner classes that 
log 'this', resulting in log messages that can't be correlated with any other 
messages. These should scope 'this' to the outer class to have consistent log 
messages.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6801) Restrict Consumer to fetch data from secure port only, and deny from non-secure port.

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6801.
--
Resolution: Information Provided

Closing as per above comment. 

> Restrict Consumer to fetch data from secure port only, and deny from 
> non-secure port.
> -
>
> Key: KAFKA-6801
> URL: https://issues.apache.org/jira/browse/KAFKA-6801
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config, consumer, security
>Affects Versions: 0.10.2.1
>Reporter: VinayKumar
>Priority: Major
>
> I have listeners configured with 2 ports as below:  (9092 -> Plaintext, 9092 
> -> SASL_PLAIN)
> listeners=PLAINTEXT://:9092, SASL_PLAIN://:9093
> For a topic, I want restrict Consumers to consume data from 9093 port only, 
> and consuming data from 9092 port should be denied.
> I've gone through ACL concept, but haven't seen an option to restrict 
> Consumer pulling data from non-secure port (in this case- 9092)
> Can someone please let me know if this is configurable ?
> Can my requirement be fulfilled. Please provide necessary info.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7242) Externalized secrets are revealed in task configuration

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595525#comment-16595525
 ] 

ASF GitHub Bot commented on KAFKA-7242:
---

ewencp closed pull request #5475: KAFKA-7242: Reverse xform configs before 
saving
URL: https://github.com/apache/kafka/pull/5475
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index f5a3737d334..6430ffdd419 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -53,7 +53,7 @@
  * {@link ConfigProvider#unsubscribe(String, Set, ConfigChangeCallback)} 
methods.
  */
 public class ConfigTransformer {
-private static final Pattern DEFAULT_PATTERN = 
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
+public static final Pattern DEFAULT_PATTERN = 
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
 private static final String EMPTY_PATH = "";
 
 private final Map configProviders;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index cadb4e05d9a..82fdeccc96b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -20,9 +20,11 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -46,6 +48,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -53,6 +56,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Abstract Herder implementation which handles connector/task lifecycle 
tracking. Extensions
@@ -431,4 +436,42 @@ private String trace(Throwable t) {
 return null;
 }
 }
+
+/*
+ * Performs a reverse transformation on a set of task configs, by 
replacing values with variable references.
+ */
+public static List> reverseTransform(String connName,
+ 
ClusterConfigState configState,
+ List> configs) {
+
+// Find the config keys in the raw connector config that have variable 
references
+Map rawConnConfig = 
configState.rawConnectorConfig(connName);
+Set connKeysWithVariableValues = 
keysWithVariableValues(rawConnConfig, ConfigTransformer.DEFAULT_PATTERN);
+
+List> result = new ArrayList<>();
+for (Map config : configs) {
+Map newConfig = new HashMap<>(config);
+for (String key : connKeysWithVariableValues) {
+if (newConfig.containsKey(key)) {
+newConfig.put(key, rawConnConfig.get(key));
+}
+}
+result.add(newConfig);
+}
+return result;
+}
+
+private static Set keysWithVariableValues(Map 
rawConfig, Pattern pattern) {
+Set keys = new HashSet<>();
+for (Map.Entry config : rawConfig.entrySet()) {
+if (config.getValue() != null) {
+Matcher matcher = pattern.matcher(config.getValue());
+if (matcher.matches()) {
+keys.add(config.getKey());
+}
+}
+}
+return keys;
+}
+
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
index 11693b51795..fc6a50d2fc0 100644
--- 

[jira] [Resolved] (KAFKA-7242) Externalized secrets are revealed in task configuration

2018-08-28 Thread Ewen Cheslack-Postava (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-7242.
--
   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.1

Issue resolved by pull request 5475
[https://github.com/apache/kafka/pull/5475]

> Externalized secrets are revealed in task configuration
> ---
>
> Key: KAFKA-7242
> URL: https://issues.apache.org/jira/browse/KAFKA-7242
> Project: Kafka
>  Issue Type: Bug
>Reporter: Bahdan Siamionau
>Assignee: Robert Yokota
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> Trying to use new [externalized 
> secrets|https://issues.apache.org/jira/browse/KAFKA-6886] feature I noticed 
> that task configuration is being saved in config topic with disclosed 
> secrets. It seems like the main goal of feature was not achieved - secrets 
> are still persisted in plain-text. Probably I'm misusing this new config, 
> please correct me if I wrong.
> I'm running connect in distributed mode, creating connector with following 
> config:
> {code:java}
> {
>   "name" : "jdbc-sink-test",
>   "config" : {
> "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
> "tasks.max" : "1",
> "config.providers" : "file",
> "config.providers.file.class" : 
> "org.apache.kafka.common.config.provider.FileConfigProvider",
> "config.providers.file.param.secrets" : "/opt/mysecrets",
> "topics" : "test_topic",
> "connection.url" : "${file:/opt/mysecrets:url}",
> "connection.user" : "${file:/opt/mysecrets:user}",
> "connection.password" : "${file:/opt/mysecrets:password}",
> "insert.mode" : "upsert",
> "pk.mode" : "record_value",
> "pk.field" : "id"
>   }
> }
> {code}
> Connector works fine, placeholders are substituted with correct values from 
> file, but then updated config is written into  the topic again (see 3 
> following records in config topic):
> {code:java}
> key: connector-jdbc-sink-test
> value:
> {
> "properties": {
> "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
> "tasks.max": "1",
> "config.providers": "file",
> "config.providers.file.class": 
> "org.apache.kafka.common.config.provider.FileConfigProvider",
> "config.providers.file.param.secrets": "/opt/mysecrets",
> "topics": "test_topic",
> "connection.url": "${file:/opt/mysecrets:url}",
> "connection.user": "${file:/opt/mysecrets:user}",
> "connection.password": "${file:/opt/mysecrets:password}",
> "insert.mode": "upsert",
> "pk.mode": "record_value",
> "pk.field": "id",
> "name": "jdbc-sink-test"
> }
> }
> key: task-jdbc-sink-test-0
> value:
> {
> "properties": {
> "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
> "config.providers.file.param.secrets": "/opt/mysecrets",
> "connection.password": "actualpassword",
> "tasks.max": "1",
> "topics": "test_topic",
> "config.providers": "file",
> "pk.field": "id",
> "task.class": "io.confluent.connect.jdbc.sink.JdbcSinkTask",
> "connection.user": "datawarehouse",
> "name": "jdbc-sink-test",
> "config.providers.file.class": 
> "org.apache.kafka.common.config.provider.FileConfigProvider",
> "connection.url": 
> "jdbc:postgresql://actualurl:5432/datawarehouse?stringtype=unspecified",
> "insert.mode": "upsert",
> "pk.mode": "record_value"
> }
> }
> key: commit-jdbc-sink-test
> value:
> {
> "tasks":1
> }
> {code}
> Please advice have I misunderstood the goal of the given feature, have I 
> missed smth in configuration or is it actually a bug? Thank you



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5962) java.io.IOException: Map failed

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5962.
--
Resolution: Fixed

Closing as docs added in KAFKA-6343

> java.io.IOException: Map failed
> ---
>
> Key: KAFKA-5962
> URL: https://issues.apache.org/jira/browse/KAFKA-5962
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
> Environment: kafka_2.12-0.11.0.0
>Reporter: Mehmet Soner
>Priority: Critical
> Attachments: broker-log-failed-during-restart.txt, 
> broker-log-start-and-shutdown.txt
>
>
> *OS:* HP-UX B.11.31 U ia64
> Step to reproduce bug:
> *1) Starting zookeeper by using below command.*
> zookeeper-server-start.sh -daemon 
> /usr/local/Apache/kafka_2.12-0.11.0.0/config/zookeeper.properties
> *2) Starting kafka by using below command.*
> kafka-server-start.sh -daemon 
> /usr/local/Apache/kafka_2.12-0.11.0.0/config/server0.properties
> *3) Writing data to topic*
> /prov/users/sas/bin$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 
> 172.31.19.85:9092 --topic ssmtopup_topic
> >test1
> >test2
> >test3
> >test4
> >test5
> *server.log*
> [2017-09-22 09:51:24,467] INFO Updated PartitionLeaderEpoch. New: {epoch:7, 
> offset:0}, Current: {epoch:-1, offset-1} for Partition: ssmtopup_topic-0. 
> Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
> [2017-09-22 09:52:23,819] INFO [Group Metadata Manager on Broker 0]: Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)
> *4) Exiting by pressing CTRL+C*
> *5) Shutting down kafka by using below command*
> *a) finding pid by uisin below command*
> jps -lm
> *b) graceful shutdown*
> kill -15 
> *You can find error below*
> *server.log *
> [2017-09-22 09:52:26,179] INFO [Kafka Server 0], shutting down 
> (kafka.server.KafkaServer)
> [2017-09-22 09:52:26,219] INFO [Kafka Server 0], Starting controlled shutdown 
> (kafka.server.KafkaServer)
> [2017-09-22 09:52:26,346] INFO [Kafka Server 0], Controlled shutdown 
> succeeded (kafka.server.KafkaServer)
> [2017-09-22 09:52:26,356] INFO [Socket Server on Broker 0], Shutting down 
> (kafka.network.SocketServer)
> [2017-09-22 09:52:26,384] INFO [Socket Server on Broker 0], Shutdown 
> completed (kafka.network.SocketServer)
> [2017-09-22 09:52:26,386] INFO [Kafka Request Handler on Broker 0], shutting 
> down (kafka.server.KafkaRequestHandlerPool)
> [2017-09-22 09:52:26,395] INFO [Kafka Request Handler on Broker 0], shut down 
> completely (kafka.server.KafkaRequestHandlerPool)
> [2017-09-22 09:52:26,411] INFO [ThrottledRequestReaper-Fetch]: Shutting down 
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,032] INFO [ThrottledRequestReaper-Fetch]: Stopped 
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,034] INFO [ThrottledRequestReaper-Fetch]: Shutdown 
> completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,034] INFO [ThrottledRequestReaper-Produce]: Shutting 
> down (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,701] INFO [ThrottledRequestReaper-Produce]: Shutdown 
> completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,701] INFO [ThrottledRequestReaper-Produce]: Stopped 
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,701] INFO [ThrottledRequestReaper-Request]: Shutting 
> down (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,985] INFO [ThrottledRequestReaper-Request]: Shutdown 
> completed (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,985] INFO [ThrottledRequestReaper-Request]: Stopped 
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-09-22 09:52:27,988] INFO [KafkaApi-0] Shutdown complete. 
> (kafka.server.KafkaApis)
> [2017-09-22 09:52:27,991] INFO [ExpirationReaper-0-topic]: Shutting down 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-09-22 09:52:28,082] INFO [ExpirationReaper-0-topic]: Stopped 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-09-22 09:52:28,088] INFO [ExpirationReaper-0-topic]: Shutdown completed 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-09-22 09:52:28,096] INFO [Transaction Coordinator 0]: Shutting down. 
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2017-09-22 09:52:28,100] INFO [ProducerId Manager 0]: Shutdown complete: 
> last producerId assigned 7000 
> (kafka.coordinator.transaction.ProducerIdManager)
> [2017-09-22 09:52:28,102] INFO [Transaction State Manager 0]: Shutdown 
> complete (kafka.coordinator.transaction.TransactionStateManager)
> [2017-09-22 09:52:28,102] INFO [Transaction 

[jira] [Resolved] (KAFKA-1712) Excessive storage usage on newly added node

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1712.
--
Resolution: Fixed

Fixed via https://issues.apache.org/jira/browse/KAFKA-2511

> Excessive storage usage on newly added node
> ---
>
> Key: KAFKA-1712
> URL: https://issues.apache.org/jira/browse/KAFKA-1712
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Oleg Golovin
>Priority: Major
>
> When a new node is added to cluster data starts replicating into it. The 
> mtime of creating segments will be set on the last message being written to 
> them. Though the replication is a prolonged process, let's assume (for 
> simplicity of explanation) that their mtime is very close to the time when 
> the new node was added.
> After the replication is done, new data will start to flow into this new 
> node. After `log.retention.hours` the amount of data will be 2 * 
> daily_amount_of_data_in_kafka_node (first one is the replicated data from 
> other nodes when the node was added (let us call it `t1`) and the second is 
> the amount of replicated data from other nodes which happened from `t1` to 
> `t1 + log.retention.hours`). So by that time the node will have twice as much 
> data as the other nodes.
> This poses a big problem to us as our storage is chosen to fit normal amount 
> of data (not twice this amount).
> In our particular case it poses another problem. We have an emergency segment 
> cleaner which runs in case storage is nearly full (>90%). We try to balance 
> the amount of data for it not to run to rely solely on kafka internal log 
> deletion, but sometimes emergency cleaner runs.
> It works this way:
> - it gets all kafka segments for the volume
> - it filters out last segments of each partition (just to avoid unnecessary 
> recreation of last small-size segments)
> - it sorts them by segment mtime
> - it changes mtime of the first N segements (with the lowest mtime) to 1, so 
> they become really really old. Number N is chosen to free specified 
> percentage of volume (3% in our case).  Kafka deletes these segments later 
> (as they are very old).
> Emergency cleaner works very well. Except for the case when the data is 
> replicated to the newly added node. 
> In this case segment mtime is the time the segment was replicated and does 
> not reflect the real creation time of original data stored in this segment.
> So in this case kafka emergency cleaner will delete segments with the lowest 
> mtime, which may hold the data which is much more recent than the data in 
> other segments.
> This is not a big problem until we delete the data which hasn't been fully 
> consumed.
> In this case we loose data and this makes it a big problem.
> Is it possible to retain segment mtime during initial replication on a new 
> node?
> This will help not to load the new node with the twice as large amount of 
> data as other nodes have.
> Or maybe there are another ways to sort segments by data creation times (or 
> close to data creation time)? (for example if this ticket is implemented 
> https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the 
> first message from .index). In our case it will help with kafka emergency 
> cleaner, which will be deleting really the oldest data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-1665) controller state gets stuck in message after execute

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1665.
--
Resolution: Auto Closed

Closing inactive issue. Closing as per above comments.

> controller state gets stuck in message after execute
> 
>
> Key: KAFKA-1665
> URL: https://issues.apache.org/jira/browse/KAFKA-1665
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: Joe Stein
>Priority: Major
>
> I had a 0.8.1.1 Kafka Broker go down, and I was trying to use the reassign 
> partition script to move topics off that broker. When I describe the topics, 
> I see the following:
> Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118 
> Replicas: 2131118,2166601,2163421 Isr: 2131118,2166601
> This shows that the broker “2163421” is down. So I create the following file 
> /tmp/move_topic.json:
> {
> "version": 1,
> "partitions": [
> {
> "topic": "mini__022active_120__33__mini",
> "partition": 0,
> "replicas": [
> 2131118, 2166601,  2156998
> ]
> }
> ]
> }
> And then do this:
> ./kafka-reassign-partitions.sh --execute --reassignment-json-file 
> /tmp/move_topic.json
> Successfully started reassignment of partitions 
> {"version":1,"partitions":[{"topic":"mini__022active_120__33__mini","partition":0,"replicas":[2131118,2166601,2156998]}]}
> However, when I try to verify this, I get the following error:
> ./kafka-reassign-partitions.sh --verify --reassignment-json-file 
> /tmp/move_topic.json
> Status of partition reassignment:
> ERROR: Assigned replicas (2131118,2166601,2156998,2163421) don't match the 
> list of replicas for reassignment (2131118,2166601,2156998) for partition 
> [mini__022active_120__33__mini,0]
> Reassignment of partition [mini__022active_120__33__mini,0] failed
> If I describe the topics, I now see there are 4 replicas. This has been like 
> this for many hours now, so it seems to have permanently moved to 4 replicas 
> for some reason.
> Topic:mini__022active_120__33__mini PartitionCount:1 ReplicationFactor:4 
> Configs:
> Topic: mini__022active_120__33__mini Partition: 0 Leader: 2131118 
> Replicas: 2131118,2166601,2156998,2163421 Isr: 2131118,2166601
> If I re-execute and re-verify, I get the same error. So it seems to be wedged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2127) Running TopicCommand --alter causes connection close/reset errors in kafka logs

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2127.
--
Resolution: Auto Closed

Closing inactive issue. Please reopen if the issue still exists in newer 
versions.

> Running TopicCommand --alter causes connection close/reset errors in kafka 
> logs
> ---
>
> Key: KAFKA-2127
> URL: https://issues.apache.org/jira/browse/KAFKA-2127
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Jason Rosenberg
>Priority: Minor
>
> I am using 0.8.2.1.  I've been noticing that any time I use TopicCommand to 
> alter a topic (e.g. add partitions) or delete a topic, the broker logs show a 
> bunch of closed connections, and usually 2 or 3 Connection reset exceptions.  
> It logs these with ERROR status.
> I recently used the kafka.admin.TopicCommand tool to increase the partitions 
> for a topic from 1 to 4.  So I ran:
> {code}
>  java -cp kafka.jar kafka.admin.TopicCommand --zookeeper myzkserver:12345 
> --topic mytopic --alter --partitions 4
> {code}
> This resulted in the following sequence in the broker log (repeated pretty 
> much in the logs of each broker):
> {code}
> 2015-04-16 03:51:26,156  INFO [kafka-network-thread-27330-1] 
> network.Processor - Closing socket connection to /1.2.3.12.
> 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
> network.Processor - Closing socket connection to /1.2.3.89.
> 2015-04-16 03:51:26,169  INFO [kafka-network-thread-27330-0] 
> network.Processor - Closing socket connection to /1.2.3.95.
> 2015-04-16 03:51:26,176 ERROR [kafka-network-thread-27330-2] 
> network.Processor - Closing socket for /1.2.4.34 because of error
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at kafka.utils.Utils$.read(Utils.scala:380)
> at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:444)
> at kafka.network.Processor.run(SocketServer.scala:340)
> at java.lang.Thread.run(Thread.java:745)
> 2015-04-16 03:51:26,178 ERROR [kafka-network-thread-27330-1] 
> network.Processor - Closing socket for /1.2.4.59 because of error
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at kafka.utils.Utils$.read(Utils.scala:380)
> at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:444)
> at kafka.network.Processor.run(SocketServer.scala:340)
> at java.lang.Thread.run(Thread.java:745)
> 2015-04-16 03:51:26,192 ERROR [kafka-network-thread-27330-1] 
> network.Processor - Closing socket for /1.2.3.11 because of error
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at kafka.utils.Utils$.read(Utils.scala:380)
> at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:444)
> at kafka.network.Processor.run(SocketServer.scala:340)
> at java.lang.Thread.run(Thread.java:745)
> 2015-04-16 03:51:26,451  INFO [kafka-request-handler-3] 
> server.ReplicaFetcherManager - [ReplicaFetcherManager on broker 45] Removed 
> fetcher for partitions [mytopic,2]
> 2015-04-16 03:51:26,453  INFO [kafka-request-handler-3] log.Log - Completed 
> load of log mytopic-2 with log end offset 0
> 2015-04-16 03:51:26,454  INFO [kafka-request-handler-3] log.LogManager - 
> Created log for partition [mytopic,2] in /data/kafka_logs with properties 
> {segment.index.bytes -> 10485760, file.delete.delay.ms -> 6, 
> segment.bytes -> 1073741824, flush.ms -> 9223372036854775807, 
> delete.retention.ms -> 8640, index.interval.bytes -> 4096, 
> retention.bytes -> 500, 

[jira] [Resolved] (KAFKA-7291) kafka-console-consumer.sh waits on inexistent topic

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7291.
--
Resolution: Fixed

Fixed in newer versions.  Yes, you need to upgrade to latest versions.

> kafka-console-consumer.sh waits on inexistent topic
> ---
>
> Key: KAFKA-7291
> URL: https://issues.apache.org/jira/browse/KAFKA-7291
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.2
>Reporter: HUSLEAG Dumitru
>Priority: Major
>
> Hello,
> My request concerns kafka-console-consumer.sh behavior (which I suppose is 
> based on KafkaConsumer).
> If I try to consume from a topic that does not exist it connects and waits as 
> if it existed, which I find is illogical.
> The broker does not have enabled *{{auto.create.topics.enable}}*
> {{Let's say I launch this command:}}
> {code}
> ./kafka-console-consumer.sh --zookeeper $(hostname):2181/kafka --topic 
> inexistentTopic
> {code}
> and inexistentTopic does not exist indeed, then the kafka-console-consumer.sh 
> will wait forever instead of exit with error code and display an error 
> message saying that the topic does not exist. 
> Anyway that's the way I would expect it to behave in this case.
> Please consider this request.
> Regards,
> Dumitru
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6789) Add retry logic in AdminClient requests

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reassigned KAFKA-6789:


Assignee: Manikumar

> Add retry logic in AdminClient requests
> ---
>
> Key: KAFKA-6789
> URL: https://issues.apache.org/jira/browse/KAFKA-6789
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Manikumar
>Priority: Major
>
> In KafkaAdminClient, today we treat all error codes as fatal and set the 
> exception accordingly in the returned futures. But for some error codes they 
> can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We 
> could consider adding the retry logic internally in the admin client so that 
> users would not need to retry themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-08-28 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7304:
---
Attachment: Screen Shot 2018-08-28 at 11.09.45 AM.png

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7164) Follower should truncate after every leader epoch change

2018-08-28 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7164:
---
Fix Version/s: 1.1.2

> Follower should truncate after every leader epoch change
> 
>
> Key: KAFKA-7164
> URL: https://issues.apache.org/jira/browse/KAFKA-7164
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Bob Barrett
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently we skip log truncation for followers if a LeaderAndIsr request is 
> received, but the leader does not change. This can lead to log divergence if 
> the follower missed a leader change before the current known leader was 
> reelected. Basically the problem is that the leader may truncate its own log 
> prior to becoming leader again, so the follower would need to reconcile its 
> log again.
> For example, suppose that we have three replicas: r1, r2, and r3. Initially, 
> r1 is the leader in epoch 0 and writes one record at offset 0. r3 replicates 
> this successfully.
> {code}
> r1: 
>   status: leader
>   epoch: 0
>   log: [{id: 0, offset: 0, epoch:0}]
> r2: 
>   status: follower
>   epoch: 0
>   log: []
> r3: 
>   status: follower
>   epoch: 0
>   log: [{id: 0, offset: 0, epoch:0}]
> {code}
> Suppose then that r2 becomes leader in epoch 1. r1 notices the leader change 
> and truncates, but r3 for whatever reason, does not.
> {code}
> r1: 
>   status: follower
>   epoch: 1
>   log: []
> r2: 
>   status: leader
>   epoch: 1
>   log: []
> r3: 
>   status: follower
>   epoch: 0
>   log: [{offset: 0, epoch:0}]
> {code}
> Now suppose that r2 fails and r1 becomes the leader in epoch 2. Immediately 
> it writes a new record:
> {code}
> r1: 
>   status: leader
>   epoch: 2
>   log: [{id: 1, offset: 0, epoch:2}]
> r2: 
>   status: follower
>   epoch: 2
>   log: []
> r3: 
>   status: follower
>   epoch: 0
>   log: [{id: 0, offset: 0, epoch:0}]
> {code}
> If the replica continues fetching with the old epoch, we can have log 
> divergence as noted in KAFKA-6880. However, if r3 successfully receives the 
> new LeaderAndIsr request which updates the epoch to 2, but skips the 
> truncation, then the logs will stay inconsistent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-08-28 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7128.

   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.1
   1.1.2

> Lagging high watermark can lead to committed data loss after ISR expansion
> --
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Some model checking exposed a weakness in the ISR expansion logic. We know 
> that the high watermark can go backwards after a leader failover, but we may 
> not have known that this can lead to the loss of committed data.
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun 
> fetching. The data up to offset 10 has been committed to the ISR. Here is the 
> initial state:
> State 1
> ISR: (r1, r2)
>  Leader: r1
>  r1: [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes 
> r2 the new leader. The high watermark is still lagging r1.
> State 2
> ISR: (r2)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 3 then catch up to the high watermark on r2 and joins the ISR. 
> Perhaps it's high watermark is lagging behind r2, but this is unimportant.
> State 3
> ISR: (r2, r3)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=5]
> Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
> committed data from offsets 5 to 10 has been lost.
> State 4
> ISR: (r3)
>  Leader: r3
>  r1 (offline): [hw=10, leo=10]
>  r2 (offline): [hw=5, leo=10]
>  r3: [hw=0, leo=5]
> The bug is the fact that we allowed r3 into the ISR after the local high 
> watermark had been reached. Since the follower does not know the true high 
> watermark for the previous leader's epoch, it should not allow a replica to 
> join the ISR until it has caught up to an offset within its own epoch.
> Note this is related to 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595357#comment-16595357
 ] 

ASF GitHub Bot commented on KAFKA-7128:
---

hachikuji closed pull request #5557: KAFKA-7128: Follower has to catch up to 
offset within current leader epoch to join ISR
URL: https://github.com/apache/kafka/pull/5557
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index a92340f2a4b..22c1508bb8c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -62,6 +62,9 @@ class Partition(val topic: String,
   private val leaderIsrUpdateLock = new ReentrantReadWriteLock
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
   @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+  // start offset for 'leaderEpoch' above (leader epoch of the current leader 
for this partition),
+  // defined when this broker is leader for partition
+  @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
   @volatile var leaderReplicaIdOpt: Option[Int] = None
   @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
 
@@ -263,6 +266,7 @@ class Partition(val topic: String,
   allReplicasMap.clear()
   inSyncReplicas = Set.empty[Replica]
   leaderReplicaIdOpt = None
+  leaderEpochStartOffsetOpt = None
   removePartitionMetrics()
   logManager.asyncDelete(topicPartition)
   logManager.asyncDelete(topicPartition, isFuture = true)
@@ -287,18 +291,19 @@ class Partition(val topic: String,
   // remove assigned replicas that have been removed by the controller
   (assignedReplicas.map(_.brokerId) -- 
newAssignedReplicas).foreach(removeReplica)
   inSyncReplicas = newInSyncReplicas
+  newAssignedReplicas.foreach(id => getOrCreateReplica(id, 
partitionStateInfo.isNew))
 
+  val leaderReplica = getReplica().get
+  val leaderEpochStartOffset = leaderReplica.logEndOffset.messageOffset
   info(s"$topicPartition starts at Leader Epoch 
${partitionStateInfo.basePartitionState.leaderEpoch} from " +
-s"offset ${getReplica().get.logEndOffset.messageOffset}. Previous 
Leader Epoch was: $leaderEpoch")
+s"offset $leaderEpochStartOffset. Previous Leader Epoch was: 
$leaderEpoch")
 
   //We cache the leader epoch here, persisting it only if it's local 
(hence having a log dir)
   leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
-  newAssignedReplicas.foreach(id => getOrCreateReplica(id, 
partitionStateInfo.isNew))
-
+  leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
   zkVersion = partitionStateInfo.basePartitionState.zkVersion
   val isNewLeader = leaderReplicaIdOpt.map(_ != 
localBrokerId).getOrElse(true)
 
-  val leaderReplica = getReplica().get
   val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
   val curTimeMs = time.milliseconds
   // initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and lastFetchLeaderLogEndOffset.
@@ -344,6 +349,7 @@ class Partition(val topic: String,
   (assignedReplicas.map(_.brokerId) -- 
newAssignedReplicas).foreach(removeReplica)
   inSyncReplicas = Set.empty[Replica]
   leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
+  leaderEpochStartOffsetOpt = None
   zkVersion = partitionStateInfo.basePartitionState.zkVersion
 
   // If the leader is unchanged and the epochs are no more than one change 
apart, indicate that no follower changes are required
@@ -388,7 +394,11 @@ class Partition(val topic: String,
 
   /**
* Check and maybe expand the ISR of the partition.
-   * A replica will be added to ISR if its LEO >= current hw of the partition.
+   * A replica will be added to ISR if its LEO >= current hw of the partition 
and it is caught up to
+   * an offset within the current leader epoch. A replica must be caught up to 
the current leader
+   * epoch before it can join ISR, because otherwise, if there is committed 
data between current
+   * leader's HW and LEO, the replica may become the leader before it fetches 
the committed data
+   * and the data will be lost.
*
* Technically, a replica shouldn't be in ISR if it hasn't caught up for 
longer than replicaLagTimeMaxMs,
* even if its log end offset is >= HW. However, to be consistent with how 
the follower determines
@@ -405,9 +415,11 @@ class Partition(val topic: String,
 case Some(leaderReplica) =>
   val replica = getReplica(replicaId).get
   val leaderHW = 

[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-08-28 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595315#comment-16595315
 ] 

Ted Yu commented on KAFKA-7304:
---

[~yuyang08]:
Please confirm that the gap between the following two metrics kept getting 
bigger:
{code}
this.connectionClosed = sensor("connections-closed:" + tagsSuffix);

this.connectionCreated = sensor("connections-created:" + 
tagsSuffix);
{code}

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-08-28 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595251#comment-16595251
 ] 

Ted Yu commented on KAFKA-7304:
---

One potential cause for the delayed closing of idle connections is that 
ConsumerNetworkClient#poll does the following before calling {{client.poll}} 
(leading to selector.poll where selector.closingChannels is checked):
{code}
long pollDelayMs = trySend(timer.currentTimeMs());
{code}
It seems when selector.closingChannels grows to certain size, we should 
proactively release the KafkaChannel's inside selector.closingChannels

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7326) Let KStream.print() to flush on each printed line

2018-08-28 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-7326:
--

Assignee: huxihx

> Let KStream.print() to flush on each printed line
> -
>
> Key: KAFKA-7326
> URL: https://issues.apache.org/jira/browse/KAFKA-7326
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Guozhang Wang
>Assignee: huxihx
>Priority: Major
>  Labels: newbie++
>
> Today, {{KStream.print()}} is implemented as a special "foreach" function as 
> below:
> {code}
> @Override
> public void apply(final K key, final V value) {
> final String data = String.format("[%s]: %s", label, 
> mapper.apply(key, value));
> printWriter.println(data);
> }
> {code}
> Note that since {{this.printWriter = new PrintWriter(new 
> OutputStreamWriter(outputStream, StandardCharsets.UTF_8));}}, without 
> flushing the writer we do not guarantee that printed lines are written to the 
> underlying `outputStream` in time.
> Since {{KStream.print()}} is mainly for debugging / testing / demoing 
> purposes, not for performance, I think it is okay to enforce auto flushing.
> This would include:
> 1. set {{autoFlush}} in the constructor of printWriter.
> 2. document in java-docs of {{KStream.print}} that this is for debug / 
> testing purposes only, and it will try to flush on each record print, and 
> hence should not be used for production usage if performance requirement is 
> key.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6458) ERROR Found invalid messages during fetch for partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, computed crc = 2559213387) (kafk

2018-08-28 Thread Davor Poldrugo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595065#comment-16595065
 ] 

Davor Poldrugo edited comment on KAFKA-6458 at 8/28/18 2:41 PM:


Any news about this bug?
 We have stumbled upon on the same problem on kafka-manager.

Kafka Manager version: 1.3.3.17

Kafka clients version: 0.10.0.1

Kafka / Broker version: 1.1


was (Author: dpoldrugo):
Any news about this bug?
We have stumbled upon on the same problem on kafka-manager.

Kafka Manager version: 1.3.3.17

Kafka clients version: 0.10.0.1.

Kafka / Broker version: 1.1

> ERROR Found invalid messages during fetch for partition 
> [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, 
> computed crc = 2559213387) (kafka.server.ReplicaFetcherThread)
> -
>
> Key: KAFKA-6458
> URL: https://issues.apache.org/jira/browse/KAFKA-6458
> Project: Kafka
>  Issue Type: Bug
>  Components: log, offset manager
>Affects Versions: 0.10.2.1
> Environment: CentOS Linux release 7.2.1511
>Reporter: VinayKumar
>Priority: Major
>
> I see the below ERRORs in the log file. Restarting the Kafka service is not 
> helping to fix it. 
> Can someone please help how to eliminate/fix these errors.
> [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,11] offset 0 error Record is corrupt (stored 
> crc = 81, computed crc = 1264288837) (kafka.server.ReplicaFetcherThread)
> [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,42] offset 0 error Record is corrupt (stored 
> crc = 73, computed crc = 1222016777) (kafka.server.ReplicaFetcherThread)
> [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored 
> crc = 73, computed crc = 2559213387) (kafka.server.ReplicaFetcherThread)
> [2018-01-17 13:52:15,777] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,22] offset 65 error Record is corrupt (stored 
> crc = 123, computed crc = 2233168612) (kafka.server.ReplicaFetcherThread)
> [2018-01-17 13:52:15,777] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,39] offset 20 error Record is corrupt (stored 
> crc = 71, computed crc = 2065457751) (kafka.server.ReplicaFetcherThread)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6458) ERROR Found invalid messages during fetch for partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, computed crc = 2559213387) (kafka.ser

2018-08-28 Thread Davor Poldrugo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595065#comment-16595065
 ] 

Davor Poldrugo commented on KAFKA-6458:
---

Any news about this bug?
We have stumbled upon on the same problem on kafka-manager.

Kafka Manager version: 1.3.3.17

Kafka clients version: 0.10.0.1.

Kafka / Broker version: 1.1

> ERROR Found invalid messages during fetch for partition 
> [__consumer_offsets,20] offset 0 error Record is corrupt (stored crc = 73, 
> computed crc = 2559213387) (kafka.server.ReplicaFetcherThread)
> -
>
> Key: KAFKA-6458
> URL: https://issues.apache.org/jira/browse/KAFKA-6458
> Project: Kafka
>  Issue Type: Bug
>  Components: log, offset manager
>Affects Versions: 0.10.2.1
> Environment: CentOS Linux release 7.2.1511
>Reporter: VinayKumar
>Priority: Major
>
> I see the below ERRORs in the log file. Restarting the Kafka service is not 
> helping to fix it. 
> Can someone please help how to eliminate/fix these errors.
> [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,11] offset 0 error Record is corrupt (stored 
> crc = 81, computed crc = 1264288837) (kafka.server.ReplicaFetcherThread)
> [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,42] offset 0 error Record is corrupt (stored 
> crc = 73, computed crc = 1222016777) (kafka.server.ReplicaFetcherThread)
> [2018-01-17 13:52:15,626] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,20] offset 0 error Record is corrupt (stored 
> crc = 73, computed crc = 2559213387) (kafka.server.ReplicaFetcherThread)
> [2018-01-17 13:52:15,777] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,22] offset 65 error Record is corrupt (stored 
> crc = 123, computed crc = 2233168612) (kafka.server.ReplicaFetcherThread)
> [2018-01-17 13:52:15,777] ERROR Found invalid messages during fetch for 
> partition [__consumer_offsets,39] offset 20 error Record is corrupt (stored 
> crc = 71, computed crc = 2065457751) (kafka.server.ReplicaFetcherThread)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7352) KIP-368: Allow SASL Connections to Periodically Re-Authenticate

2018-08-28 Thread Ron Dagostino (JIRA)
Ron Dagostino created KAFKA-7352:


 Summary: KIP-368: Allow SASL Connections to Periodically 
Re-Authenticate
 Key: KAFKA-7352
 URL: https://issues.apache.org/jira/browse/KAFKA-7352
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Reporter: Ron Dagostino
Assignee: Ron Dagostino


KIP-368: Allow SASL Connections to Periodically Re-Authenticate

The adoption of KIP-255: OAuth Authentication via SASL/OAUTHBEARER in release 
2.0.0 creates the possibility of using information in the bearer token to make 
authorization decisions.  Unfortunately, however, Kafka connections are 
long-lived, so there is no ability to change the bearer token associated with a 
particular connection.  Allowing SASL connections to periodically 
re-authenticate would resolve this.  In addition to this motivation there are 
two others that are security-related.  First, to eliminate access to Kafka the 
current requirement is to remove all authorizations (i.e. remove all ACLs).  
This is necessary because of the long-lived nature of the connections.  It is 
operationally simpler to shut off access at the point of authentication, and 
with the release of KIP-86: Configurable SASL Callback Handlers it is going to 
become more and more likely that installations will authenticate users against 
external directories (e.g. via LDAP).  The ability to stop Kafka access by 
simply disabling an account in an LDAP directory (for example) is desirable.  
The second motivating factor for re-authentication related to security is that 
the use of short-lived tokens is a common OAuth security recommendation, but 
issuing a short-lived token to a Kafka client (or a broker when OAUTHBEARER is 
the inter-broker protocol) currently has no benefit because once a client is 
connected to a broker the client is never challenged again and the connection 
may remain intact beyond the token expiration time (and may remain intact 
indefinitely under perfect circumstances).  This KIP proposes adding the 
ability for clients (and brokers when OAUTHBEARER is the inter-broker protocol) 
to re-authenticate their connections to brokers and have the new bearer token 
appear on their session rather than the old one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7351) Wrong logic in method ClusterConnectionStates.isDisconnected

2018-08-28 Thread Boris Zhguchev (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boris Zhguchev updated KAFKA-7351:
--
Description: 
I think the method 

org.apache.kafka.clients.ClusterConnectionStates.isDisconnected

has wrong logic in proccesing wrong ids.
{code:java}
@Before
public void setup() {
this.connectionStates = new ClusterConnectionStates(10_000, 60_000);
}

@Test
public void testIsDisconnected(){
boolean connected = connectionStates.isConnected("fake_node");
boolean disconnected = connectionStates.isDisconnected("fake_node");

assertFalse(connected); // false
assertFalse(disconnected); // false
}
{code}
It can be related with that that code block:
{code:java}
public boolean isDisconnected(String id) {
  NodeConnectionState state = nodeState.get(id);
// may be better is state == null ? true : state.isDisconnected()
  return state != null && state.state.isDisconnected(); 
}
{code}
[link|https://github.com/apache/kafka/blob/19b8ac55c389e4b2022476431a28c8431caed52a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L252]
 to github

  was:
I think the method ClusterConnectionStates.isDisconnected has wrong logic in 
proccesing wrong ids.
{code:java}
@Before
public void setup() {
this.connectionStates = new ClusterConnectionStates(10_000, 60_000);
}

@Test
public void testIsDisconnected(){
boolean connected = connectionStates.isConnected("fake_node");
boolean disconnected = connectionStates.isDisconnected("fake_node");

assertFalse(connected); // false
assertFalse(disconnected); // false
}
{code}
It can be related with that that code block:
{code:java}
public boolean isDisconnected(String id) {
  NodeConnectionState state = nodeState.get(id);
// may be better is state == null ? true : state.isDisconnected()
  return state != null && state.state.isDisconnected(); 
}
{code}
[link|https://github.com/apache/kafka/blob/19b8ac55c389e4b2022476431a28c8431caed52a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L252]
 to github


> Wrong logic in method ClusterConnectionStates.isDisconnected
> 
>
> Key: KAFKA-7351
> URL: https://issues.apache.org/jira/browse/KAFKA-7351
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Boris Zhguchev
>Priority: Minor
>
> I think the method 
> org.apache.kafka.clients.ClusterConnectionStates.isDisconnected
> has wrong logic in proccesing wrong ids.
> {code:java}
> @Before
> public void setup() {
> this.connectionStates = new ClusterConnectionStates(10_000, 60_000);
> }
> @Test
> public void testIsDisconnected(){
> boolean connected = connectionStates.isConnected("fake_node");
> boolean disconnected = connectionStates.isDisconnected("fake_node");
> assertFalse(connected); // false
> assertFalse(disconnected); // false
> }
> {code}
> It can be related with that that code block:
> {code:java}
> public boolean isDisconnected(String id) {
>   NodeConnectionState state = nodeState.get(id);
> // may be better is state == null ? true : state.isDisconnected()
>   return state != null && state.state.isDisconnected(); 
> }
> {code}
> [link|https://github.com/apache/kafka/blob/19b8ac55c389e4b2022476431a28c8431caed52a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L252]
>  to github



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7351) Wrong logic in method ClusterConnectionStates.isDisconnected

2018-08-28 Thread Boris Zhguchev (JIRA)
Boris Zhguchev created KAFKA-7351:
-

 Summary: Wrong logic in method 
ClusterConnectionStates.isDisconnected
 Key: KAFKA-7351
 URL: https://issues.apache.org/jira/browse/KAFKA-7351
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.0
Reporter: Boris Zhguchev


I think the method ClusterConnectionStates.isDisconnected has wrong logic in 
proccesing wrong ids.
{code:java}
@Before
public void setup() {
this.connectionStates = new ClusterConnectionStates(10_000, 60_000);
}

@Test
public void testIsDisconnected(){
boolean connected = connectionStates.isConnected("fake_node");
boolean disconnected = connectionStates.isDisconnected("fake_node");

assertFalse(connected); // false
assertFalse(disconnected); // false
}
{code}
It can be related with that that code block:
{code:java}
public boolean isDisconnected(String id) {
  NodeConnectionState state = nodeState.get(id);
// may be better is state == null ? true : state.isDisconnected()
  return state != null && state.state.isDisconnected(); 
}
{code}
[link|https://github.com/apache/kafka/blob/19b8ac55c389e4b2022476431a28c8431caed52a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L252]
 to github



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-28 Thread joechen8...@gmail.com (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

joechen8...@gmail.com updated KAFKA-7318:
-
Attachment: KafkaTest.java

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: joechen8...@gmail.com
>Priority: Major
> Attachments: KafkaTest.java, KafkaTest.java, KafkaTest.java
>
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>consumer.poll(0);
>consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7269) KStream.merge is not documented

2018-08-28 Thread Luca Pette (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594715#comment-16594715
 ] 

Luca Pette commented on KAFKA-7269:
---

[~mjsax] approved this one already (thank you very much!) Anything else I 
should do so we can merge it? I'm not 100% sure I've done everything correctly 
as it's my first contribution

> KStream.merge is not documented
> ---
>
> Key: KAFKA-7269
> URL: https://issues.apache.org/jira/browse/KAFKA-7269
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0
>Reporter: John Roesler
>Assignee: Luca Pette
>Priority: Major
>  Labels: beginner, newbie
>
> If I understand the operator correctly, it should be documented as a 
> stateless transformation at 
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4321) Make client.id available to MetricReporter and (De)Serializers in clients

2018-08-28 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594698#comment-16594698
 ] 

Manikumar commented on KAFKA-4321:
--

Passing client.id for MetricReporters  is handled in KAFKA-6123

> Make client.id available to MetricReporter and (De)Serializers in clients
> -
>
> Key: KAFKA-4321
> URL: https://issues.apache.org/jira/browse/KAFKA-4321
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sumit Arrawatia
>Assignee: Sumit Arrawatia
>Priority: Minor
>
> Currently, only the interceptors get the default client.id (which is 
> generated if client.id is not set by the users) in configure(...) method. It 
> is useful to pass the default client.id for the MetricReporters and 
> (De)Serializers too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7276) Consider using re2j to speed up regex operations

2018-08-28 Thread kevin.chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kevin.chen reassigned KAFKA-7276:
-

Assignee: kevin.chen

> Consider using re2j to speed up regex operations
> 
>
> Key: KAFKA-7276
> URL: https://issues.apache.org/jira/browse/KAFKA-7276
> Project: Kafka
>  Issue Type: Task
>  Components: packaging
>Reporter: Ted Yu
>Assignee: kevin.chen
>Priority: Major
>
> https://github.com/google/re2j
> re2j claims to do linear time regular expression matching in Java.
> Its benefit is most obvious for deeply nested regex (such as a | b | c | d).
> We should consider using re2j to speed up regex operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-28 Thread joechen8...@gmail.com (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

joechen8...@gmail.com updated KAFKA-7318:
-
Attachment: KafkaTest.java

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: joechen8...@gmail.com
>Priority: Major
> Attachments: KafkaTest.java, KafkaTest.java
>
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>consumer.poll(0);
>consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-28 Thread joechen8...@gmail.com (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594651#comment-16594651
 ] 

joechen8...@gmail.com commented on KAFKA-7318:
--

Here is the test case.

[^KafkaTest.java]

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: joechen8...@gmail.com
>Priority: Major
> Attachments: KafkaTest.java, KafkaTest.java
>
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>consumer.poll(0);
>consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-28 Thread joechen8...@gmail.com (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594649#comment-16594649
 ] 

joechen8...@gmail.com edited comment on KAFKA-7318 at 8/28/18 7:29 AM:
---

Here is the test case.

[^KafkaTest.java]


was (Author: joechen):
Here is the test case.

[^KafkaTest.java]

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: joechen8...@gmail.com
>Priority: Major
> Attachments: KafkaTest.java
>
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>consumer.poll(0);
>consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-28 Thread joechen8...@gmail.com (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594649#comment-16594649
 ] 

joechen8...@gmail.com commented on KAFKA-7318:
--

Here is the test case.

[^KafkaTest.java]

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: joechen8...@gmail.com
>Priority: Major
> Attachments: KafkaTest.java
>
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>consumer.poll(0);
>consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-08-28 Thread Yu Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594320#comment-16594320
 ] 

Yu Yang edited comment on KAFKA-7304 at 8/28/18 7:28 AM:
-

After more experiments, we currently think that the issue is caused by too many 
idle ssl connections that are not closed on time. 

I set up a test cluster of 24 brokers using d2.8xlarge instances, allocated 
32gb for kafka process heap space, and have ~40k clients writes to a test topic 
on this cluster. The following graph shows the jvm heap usage and gc activity 
in the past 24 hours or so. The cluster ran fine with low heap usage and low 
cpu usage.  However, the heap usage and the cpu usage of brokers increased 
sharply when we added or terminated brokers in this cluster (for broker 
termination, there was no topic partitions allocated on those terminated 
nodes).  

http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMjgvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS01LTQzLTU=

http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMjgvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS03LTYtMzU=

Sometimes the cluster can be recovered by turning off the ssl writing traffic 
to the cluster, letting the broker to garbage collect the objects in the old 
gen, and resuming the ssl writing traffic.  Sometimes the cluster still  could 
not recover fully due to dramatic increase of heap size and high cpu usage when 
we turned on the ssl writing traffic again. 


was (Author: yuyang08):
After more experiments, we currently think that the issue is caused by too many 
idle ssl connections that are not closed on time. 

I set up a test cluster of 24 brokers using d2.8xlarge instances, allocated 
32gb for kafka process heap space, and have ~40k clients writes to a test topic 
on this cluster. The following graph shows the jvm heap usage and gc activity 
in the past 24 hours or so. The cluster ran fine with low heap usage and low 
cpu usage.  However, the heap usage and the cpu usage of brokers increased 
sharply when we added or terminated brokers in this cluster (for broker 
termination, there was no topic partitions allocated on those terminated 
nodes).  

http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMjgvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS01LTQzLTU=

http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMjgvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS01LTQzLTU=

Sometimes the cluster can be recovered by turning off the ssl writing traffic 
to the cluster, letting the broker to garbage collect the objects in the old 
gen, and resuming the ssl writing traffic.  Sometimes the cluster still  could 
not recover fully due to dramatic increase of heap size and high cpu usage when 
we turned on the ssl writing traffic again. 

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 

[jira] [Updated] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-28 Thread joechen8...@gmail.com (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

joechen8...@gmail.com updated KAFKA-7318:
-
Attachment: KafkaTest.java

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: joechen8...@gmail.com
>Priority: Major
> Attachments: KafkaTest.java
>
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>consumer.poll(0);
>consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-08-28 Thread Yu Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594320#comment-16594320
 ] 

Yu Yang edited comment on KAFKA-7304 at 8/28/18 7:26 AM:
-

After more experiments, we currently think that the issue is caused by too many 
idle ssl connections that are not closed on time. 

I set up a test cluster of 24 brokers using d2.8xlarge instances, allocated 
32gb for kafka process heap space, and have ~40k clients writes to a test topic 
on this cluster. The following graph shows the jvm heap usage and gc activity 
in the past 24 hours or so. The cluster ran fine with low heap usage and low 
cpu usage.  However, the heap usage and the cpu usage of brokers increased 
sharply when we added or terminated brokers in this cluster (for broker 
termination, there was no topic partitions allocated on those terminated 
nodes).  

http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMjgvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS01LTQzLTU=

http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMjgvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS01LTQzLTU=

Sometimes the cluster can be recovered by turning off the ssl writing traffic 
to the cluster, letting the broker to garbage collect the objects in the old 
gen, and resuming the ssl writing traffic.  Sometimes the cluster still  could 
not recover fully due to dramatic increase of heap size and high cpu usage when 
we turned on the ssl writing traffic again. 


was (Author: yuyang08):
After more experiments, we currently think that the issue is caused by too many 
idle ssl connections that are not closed on time. 

I set up a test cluster of 24 brokers using d2.8xlarge instances, allocated 
32gb for kafka process heap space, and have ~40k clients writes to a test topic 
on this cluster. The following graph shows the jvm heap usage and gc activity 
in the past 24 hours or so. The cluster ran fine with low heap usage and low 
cpu usage.  However, the heap usage and the cpu usage of brokers increased 
sharply when we added or terminated brokers in this cluster (for broker 
termination, there was no topic partitions allocated on those terminated 
nodes).  

http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMjgvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS01LTQzLTU=

The cluster can be recovered by turning off the ssl writing traffic to the 
cluster, letting the broker to garbage collect the objects in the old gen, and 
resuming the ssl writing traffic. 

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation 

[jira] [Resolved] (KAFKA-2385) zookeeper-shell does not work

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2385.
--
Resolution: Fixed

Fixed in new zookeeper (3.4.7 +)  versions

> zookeeper-shell does not work
> -
>
> Key: KAFKA-2385
> URL: https://issues.apache.org/jira/browse/KAFKA-2385
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Jiangjie Qin
>Assignee: Flavio Junqueira
>Priority: Major
>
> The zookeeper shell shipped with Kafka does not work because jline jar is 
> missing.
> [jqin@jqin-ld1 bin]$ ./zookeeper-shell.sh localhost:2181
> Connecting to localhost:2181
> Welcome to ZooKeeper!
> JLine support is disabled
> WATCHER::
> WatchedEvent state:SyncConnected type:None path:null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2238) KafkaMetricsConfig cannot be configured in broker (KafkaConfig)

2018-08-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2238.
--
Resolution: Duplicate

Resolving as duplicated of KAFKA-5066 which will add docs for KafkaMetricsConfig

> KafkaMetricsConfig cannot be configured in broker (KafkaConfig)
> ---
>
> Key: KAFKA-2238
> URL: https://issues.apache.org/jira/browse/KAFKA-2238
> Project: Kafka
>  Issue Type: Bug
>Reporter: Aditya Auradkar
>Assignee: Aditya Auradkar
>Priority: Major
> Attachments: KAFKA-2238.patch
>
>
> All metrics config values are not included in KafkaConfig and consequently 
> cannot be configured into the brokers. This is because the 
> KafkaMetricsReporter is passed a properties object generated by calling 
> toProps on KafkaConfig
> KafkaMetricsReporter.startReporters(new 
> VerifiableProperties(serverConfig.toProps))
> However, KafkaConfig never writes these values into the properties object and 
> hence these aren't configurable. The defaults always apply
> Add the following metrics to KafkaConfig
> kafka.metrics.reporters, kafka.metrics.polling.interval.secs, 
> kafka.csv.metrics.reporter.enabled, kafka.csv.metrics.dir



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)