[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14197908#comment-14197908 ] Vishal commented on KAFKA-1745: --- No, since I figured that calling producer.close() and returning that producer object to the pool would make that producer object unusable afterwards. Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14197908#comment-14197908 ] Vishal edited comment on KAFKA-1745 at 11/5/14 8:56 AM: No, since I figured that calling producer.close() and returning that producer object to the pool would make that producer object unusable afterwards. was (Author: vishal m): No, since I figured that calling producer.close() and returning that producer object to the pool would make that producer object unusable afterwards. Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal updated KAFKA-1745: -- Description: Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established remain constant throughout. was: Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal updated KAFKA-1745: -- Description: Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established from the producer system to the Kafka Broker remain constant throughout. was: Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established remain constant throughout. Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established from the producer system to the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198212#comment-14198212 ] Dmytro Kostiuchenko commented on KAFKA-1667: Hi. I'm willing to work on this. Currently I have few questions: # What should be of a higher priority: keeping same interface for LogConfig or making code consistent with other configs based on ConfigDef? In the latter case changes would also impact AdminUtils, TopicCommand and perhaps some other classes. # Are there any guidelines to assigning priorities? # Is there some place to check for valid ranges for properties? It is always possible to induce them from the code, but would be easier, of course, to have some reference doc. Thanks topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1100) metrics shouldn't have generation/timestamp specific names
[ https://issues.apache.org/jira/browse/KAFKA-1100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198433#comment-14198433 ] Vladimir Tretyakov commented on KAFKA-1100: --- Right [~otis], names will be: {code} kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=af_servers,allBrokers=true kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=af_servers,brokerHost=wawanawna,brokerPort=9092 {code} metrics shouldn't have generation/timestamp specific names -- Key: KAFKA-1100 URL: https://issues.apache.org/jira/browse/KAFKA-1100 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg I've noticed that there are several metrics that seem useful for monitoring overtime, but which contain generational timestamps in the metric name. We are using yammer metrics libraries to send metrics data in a background thread every 10 seconds (to kafka actually), and then they eventually end up in a metrics database (graphite, opentsdb). The metrics then get graphed via UI, and we can see metrics going way back, etc. Unfortunately, many of the metrics coming from kafka seem to have metric names that change any time the server or consumer is restarted, which makes it hard to easily create graphs over long periods of time (spanning app restarts). For example: names like: kafka.consumer.FetchRequestAndResponseMetricssquare-1371718712833-e9bb4d10-0-508818741-AllBrokersFetchRequestRateAndTimeMs or: kafka.consumer.ZookeeperConsumerConnector...topicName.square-1373476779391-78aa2e83-0-FetchQueueSize In our staging environment, we have our servers on regular auto-deploy cycles (they restart every few hours). So just not longitudinally usable to have metric names constantly changing like this. Is there something that can easily be done? Is it really necessary to have so much cryptic info in the metric name? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1100) metrics shouldn't have generation/timestamp specific names
[ https://issues.apache.org/jira/browse/KAFKA-1100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198433#comment-14198433 ] Vladimir Tretyakov edited comment on KAFKA-1100 at 11/5/14 2:35 PM: Right [~otis], names will be: {code} kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=af_servers,allBrokers=true kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=af_servers,brokerHost=wawanawna,brokerPort=9092 kafka.consumer:type=ZookeeperConsumerConnector,name=FetchQueueSize,clientId=af_servers,topic=spm_topic,threadId=0 {code} was (Author: vladimir.tretyakov): Right [~otis], names will be: {code} kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=af_servers,allBrokers=true kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=af_servers,brokerHost=wawanawna,brokerPort=9092 {code} metrics shouldn't have generation/timestamp specific names -- Key: KAFKA-1100 URL: https://issues.apache.org/jira/browse/KAFKA-1100 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg I've noticed that there are several metrics that seem useful for monitoring overtime, but which contain generational timestamps in the metric name. We are using yammer metrics libraries to send metrics data in a background thread every 10 seconds (to kafka actually), and then they eventually end up in a metrics database (graphite, opentsdb). The metrics then get graphed via UI, and we can see metrics going way back, etc. Unfortunately, many of the metrics coming from kafka seem to have metric names that change any time the server or consumer is restarted, which makes it hard to easily create graphs over long periods of time (spanning app restarts). For example: names like: kafka.consumer.FetchRequestAndResponseMetricssquare-1371718712833-e9bb4d10-0-508818741-AllBrokersFetchRequestRateAndTimeMs or: kafka.consumer.ZookeeperConsumerConnector...topicName.square-1373476779391-78aa2e83-0-FetchQueueSize In our staging environment, we have our servers on regular auto-deploy cycles (they restart every few hours). So just not longitudinally usable to have metric names constantly changing like this. Is there something that can easily be done? Is it really necessary to have so much cryptic info in the metric name? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1723) make the metrics name in new producer more standard
[ https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198439#comment-14198439 ] Vladimir Tretyakov commented on KAFKA-1723: --- Hi [~junrao], {quote} It uses a proprietary newly developed metrics package. {quote} Does that mean that users who will use new producer will see 'old' style metric's name? {quote} proprietary newly developed metrics package {quote} Maybe it is possible to add new metrics in sync with https://issues.apache.org/jira/browse/KAFKA-1481 ? Just worry that in future Kafka may have many metric's naming conventions, sorry:) make the metrics name in new producer more standard --- Key: KAFKA-1723 URL: https://issues.apache.org/jira/browse/KAFKA-1723 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 0.8.2 Reporter: Jun Rao Fix For: 0.8.3 The jmx name in the new producer looks like the following: kafka.producer.myclientid:type=mytopic However, this can be ambiguous since we allow . in client id and topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1723) make the metrics name in new producer more standard
[ https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198502#comment-14198502 ] Jun Rao commented on KAFKA-1723: Yes, we can make the mbean names in the new metrics package consistent with what's in the broker. make the metrics name in new producer more standard --- Key: KAFKA-1723 URL: https://issues.apache.org/jira/browse/KAFKA-1723 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 0.8.2 Reporter: Jun Rao Fix For: 0.8.3 The jmx name in the new producer looks like the following: kafka.producer.myclientid:type=mytopic However, this can be ambiguous since we allow . in client id and topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198533#comment-14198533 ] Jun Rao commented on KAFKA-1667: For now, we can probably just keep LogConfig and add the validation in LogConfig.fromProps(). Later on, we can change all broker side configs (KafkaConfig, ZKConfig, TopicConfig) to use ConfigDef together. topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198537#comment-14198537 ] Jun Rao commented on KAFKA-1745: If you don't call producer.close(), the socket connections and the queue associated with the producer will not be freed. Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established from the producer system to the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1755) Log cleaner thread should not exit on errors
Joel Koshy created KAFKA-1755: - Summary: Log cleaner thread should not exit on errors Key: KAFKA-1755 URL: https://issues.apache.org/jira/browse/KAFKA-1755 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Fix For: 0.8.3 The log cleaner is a critical process when using compacted topics. However, if there is any error in any topic (notably if a key is missing) then the cleaner exits and all other compacted topics will also be adversely affected - i.e., compaction stops across the board. This can be improved by just aborting compaction for a topic on any error and keep the thread from exiting. Another improvement would be to reject messages without keys that are sent to compacted topics although this is not enough by itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
KAFKA-1667 guidance required
Hi, I decided to fix KAFKA-1667. Currently I have an initial patch, which seems to work. I would like to know, whether overall code is ok. Also there are few TODOs in the code 1. I haven't added documentation to the properties, as ConfigDef suggests. Should I? 2. I'm not sure what Importance should be assigned to properties. It is NORMAL for all properties. Where can I find some info on this? 3. Not totally sure, that validations are correct. Tried to figure that out from the code, still might miss something. Finally is this mailing list is the right place to ask such questions or should I submit patch to Jira ticket and get a review there even if I'm not sure about its quality? Thanks for the help. The patch itself: diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index c4cea2c..347e252 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -19,6 +19,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; /** * This class is used for specifying the set of expected configurations, their type, their defaults, their @@ -49,6 +50,14 @@ public class ConfigDef { private final MapString, ConfigKey configKeys = new HashMapString, ConfigKey(); /** + * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef} + * @return new unmodifiable {@link Set} instance containing the keys + */ +public SetString names() { +return Collections.unmodifiableSet(configKeys.keySet()); +} + +/** * Define a new configuration * @param name The name of the config parameter * @param type The type of the config diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 0b2735e..285c033 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -256,7 +256,7 @@ object TopicCommand { .ofType(classOf[String]) val nl = System.getProperty(line.separator) val configOpt = parser.accepts(config, A topic configuration override for the topic being created or altered. + - The following is a list of valid configurations: + nl + LogConfig.ConfigNames.map(\t + _).mkString(nl) + nl + + The following is a list of valid configurations: + nl + LogConfig.configNames.map(\t + _).mkString(nl) + nl + See the Kafka documentation for full details on the topic configs.) .withRequiredArg .describedAs(name=value) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index e48922a..3e0a986 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -21,7 +21,7 @@ import java.util.Properties import org.apache.kafka.common.utils.Utils import scala.collection._ -import kafka.common._ +import org.apache.kafka.common.config.ConfigDef object Defaults { val SegmentSize = 1024 * 1024 @@ -106,6 +106,10 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, } object LogConfig { + + val Delete = delete + val Compact = compact + val SegmentBytesProp = segment.bytes val SegmentMsProp = segment.ms val SegmentJitterMsProp = segment.jitter.ms @@ -123,46 +127,61 @@ object LogConfig { val UncleanLeaderElectionEnableProp = unclean.leader.election.enable val MinInSyncReplicasProp = min.insync.replicas - val ConfigNames = Set(SegmentBytesProp, -SegmentMsProp, -SegmentJitterMsProp, -SegmentIndexBytesProp, -FlushMessagesProp, -FlushMsProp, -RetentionBytesProp, -RententionMsProp, -MaxMessageBytesProp, -IndexIntervalBytesProp, -FileDeleteDelayMsProp, -DeleteRetentionMsProp, -MinCleanableDirtyRatioProp, -CleanupPolicyProp, -UncleanLeaderElectionEnableProp, -MinInSyncReplicasProp) + private val configDef = { +import ConfigDef.Range._ +import ConfigDef.ValidString._ +import ConfigDef.Type._ +import ConfigDef.Importance._ +import java.util.Arrays.asList + +// TODO clarify importance +// TODO clarify validations +// TODO define documentation +new ConfigDef() +.define(SegmentBytesProp, INT,
[jira] [Commented] (KAFKA-1755) Log cleaner thread should not exit on errors
[ https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198578#comment-14198578 ] Chris Riccomini commented on KAFKA-1755: It might also be desirable to allow the log compaction to continue on the topic in question, and simply keep all messages without keys without doing any compaction on them. Log cleaner thread should not exit on errors Key: KAFKA-1755 URL: https://issues.apache.org/jira/browse/KAFKA-1755 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Fix For: 0.8.3 The log cleaner is a critical process when using compacted topics. However, if there is any error in any topic (notably if a key is missing) then the cleaner exits and all other compacted topics will also be adversely affected - i.e., compaction stops across the board. This can be improved by just aborting compaction for a topic on any error and keep the thread from exiting. Another improvement would be to reject messages without keys that are sent to compacted topics although this is not enough by itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1723) make the metrics name in new producer more standard
[ https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198580#comment-14198580 ] Vladimir Tretyakov commented on KAFKA-1723: --- Good news, thx [~junrao] make the metrics name in new producer more standard --- Key: KAFKA-1723 URL: https://issues.apache.org/jira/browse/KAFKA-1723 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 0.8.2 Reporter: Jun Rao Fix For: 0.8.3 The jmx name in the new producer looks like the following: kafka.producer.myclientid:type=mytopic However, this can be ambiguous since we allow . in client id and topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198592#comment-14198592 ] Jun Rao commented on KAFKA-1738: The log shows that the log dir was created for topic_A. Were you looking at the right directory? [2014-11-04 21:36:30,724] INFO Created log for partition [topic_A,0] in /tmp/kafka-logs with properties Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at
Re: KAFKA-1667 guidance required
Dmytro, We can discuss your questions on the jira. Thanks, Jun On Wed, Nov 5, 2014 at 8:18 AM, Dmytro Kostiuchenko dmytro.kostiuche...@gmail.com wrote: Hi, I decided to fix KAFKA-1667. Currently I have an initial patch, which seems to work. I would like to know, whether overall code is ok. Also there are few TODOs in the code 1. I haven't added documentation to the properties, as ConfigDef suggests. Should I? 2. I'm not sure what Importance should be assigned to properties. It is NORMAL for all properties. Where can I find some info on this? 3. Not totally sure, that validations are correct. Tried to figure that out from the code, still might miss something. Finally is this mailing list is the right place to ask such questions or should I submit patch to Jira ticket and get a review there even if I'm not sure about its quality? Thanks for the help. The patch itself: diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index c4cea2c..347e252 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -19,6 +19,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; /** * This class is used for specifying the set of expected configurations, their type, their defaults, their @@ -49,6 +50,14 @@ public class ConfigDef { private final MapString, ConfigKey configKeys = new HashMapString, ConfigKey(); /** + * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef} + * @return new unmodifiable {@link Set} instance containing the keys + */ +public SetString names() { +return Collections.unmodifiableSet(configKeys.keySet()); +} + +/** * Define a new configuration * @param name The name of the config parameter * @param type The type of the config diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 0b2735e..285c033 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -256,7 +256,7 @@ object TopicCommand { .ofType(classOf[String]) val nl = System.getProperty(line.separator) val configOpt = parser.accepts(config, A topic configuration override for the topic being created or altered. + - The following is a list of valid configurations: + nl + LogConfig.ConfigNames.map(\t + _).mkString(nl) + nl + + The following is a list of valid configurations: + nl + LogConfig.configNames.map(\t + _).mkString(nl) + nl + See the Kafka documentation for full details on the topic configs.) .withRequiredArg .describedAs(name=value) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index e48922a..3e0a986 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -21,7 +21,7 @@ import java.util.Properties import org.apache.kafka.common.utils.Utils import scala.collection._ -import kafka.common._ +import org.apache.kafka.common.config.ConfigDef object Defaults { val SegmentSize = 1024 * 1024 @@ -106,6 +106,10 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, } object LogConfig { + + val Delete = delete + val Compact = compact + val SegmentBytesProp = segment.bytes val SegmentMsProp = segment.ms val SegmentJitterMsProp = segment.jitter.ms @@ -123,46 +127,61 @@ object LogConfig { val UncleanLeaderElectionEnableProp = unclean.leader.election.enable val MinInSyncReplicasProp = min.insync.replicas - val ConfigNames = Set(SegmentBytesProp, -SegmentMsProp, -SegmentJitterMsProp, -SegmentIndexBytesProp, -FlushMessagesProp, -FlushMsProp, -RetentionBytesProp, -RententionMsProp, -MaxMessageBytesProp, -IndexIntervalBytesProp, -FileDeleteDelayMsProp, -DeleteRetentionMsProp, -MinCleanableDirtyRatioProp, -CleanupPolicyProp, -UncleanLeaderElectionEnableProp, -MinInSyncReplicasProp) + private val configDef = { +import ConfigDef.Range._ +import ConfigDef.ValidString._
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198614#comment-14198614 ] Jun Rao commented on KAFKA-1667: If you already started on converting LogConfig to ConfigDef, that's fine too. To start with, I will assign medium importance to all those topic level config properties. topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198633#comment-14198633 ] BalajiSeshadri commented on KAFKA-1476: --- [~junrao] or [~jkreps] Can any of you review it,seems like neha is pretty busy. Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmytro Kostiuchenko updated KAFKA-1667: --- Attachment: KAFKA-1667.patch Initial patch with 3 TODOs to resolve topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198637#comment-14198637 ] Dmytro Kostiuchenko commented on KAFKA-1667: As suggested on mailing list, moving discussion to JIRA. Currently I have an initial patch, which seems to work. I would like to know, whether overall code is ok. Also there are few TODOs in the code # I haven't added documentation to the properties, as {{ConfigDef}} suggests. Should I? # I'm not sure what Importance should be assigned to properties. It is {{MEDIUM}} for all properties. Where can I find some info on this? # Not totally sure, that validations are correct. Tried to figure that out from the code, still might miss something. Please, find the patch attached. Thanks topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmytro Kostiuchenko updated KAFKA-1667: --- Comment: was deleted (was: As suggested on mailing list, moving discussion to JIRA. Currently I have an initial patch, which seems to work. I would like to know, whether overall code is ok. Also there are few TODOs in the code # I haven't added documentation to the properties, as {{ConfigDef}} suggests. Should I? # I'm not sure what Importance should be assigned to properties. It is {{MEDIUM}} for all properties. Where can I find some info on this? # Not totally sure, that validations are correct. Tried to figure that out from the code, still might miss something. Please, find the patch attached. Thanks) topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198648#comment-14198648 ] Pradeep commented on KAFKA-1738: I suggest you to run the script once on a working setup of kafka. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at
[jira] [Comment Edited] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198648#comment-14198648 ] Pradeep edited comment on KAFKA-1738 at 11/5/14 5:03 PM: - I suggest you to run the script once on a working setup of kafka. The only error that we see on kafka controller.log is [2014-11-05 11:42:53,088] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [topic_13,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-05 11:42:53,088] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [topic_13,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:5) (kafka.controller.PartitionStateMachine) [2014-11-05 11:42:53,097] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-05 11:42:53,097] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=topic_13,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-05 11:42:53,097] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 5 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:5;CorrelationId:16;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[topic_13,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:5),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-05 11:42:53,099] INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to id:0,host:DMIPVM,port:9092 for sending state change requests (kafka.controller.RequestSendThread) [2014-11-05 11:45:34,605] TRACE [Controller 0]: checking need to trigger partition rebalance (kafka.controller.KafkaController) [2014-11-05 11:45:34,607] DEBUG [Controller 0]: preferred replicas by broker Map(0 - Map([topic_8,0] - List(0), [topic_13,0] - List(0), [topic_6,0] - List(0), [topic_1,0] - List(0), [topic_9,0] - List(0), [topic_2,0] - List(0), [topic_11,0] - List(0), [topic_4,0] - List(0), [topic_12,0] - List(0), [topic_7,0] - List(0), [topic_3,0] - List(0), [topic_10,0] - List(0))) (kafka.controller.KafkaController) [2014-11-05 11:45:34,608] DEBUG [Controller 0]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2014-11 was (Author: pradeepbadiger): I suggest you to run the script once on a working setup of kafka. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list
[jira] [Comment Edited] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198638#comment-14198638 ] Dmytro Kostiuchenko edited comment on KAFKA-1667 at 11/5/14 5:04 PM: - So, I decided to just add validation and to impact as little as possible. There is a new method in {{ConfigDef}}, which is needed to avoid duplication in LogConfig (instead of maintaining ConfigNames we derive them from configDef) All properties are now of MEDIUM importance and have no documentation. All numeric values are required to be 0, compact property validated against set [compact, delete]. I would appreciate if you take a look on the patch and suggest, whether it is ok to submit it as a fix. P.S. sorry for my megaverbosity. That's my first contribution was (Author: edio): Initial patch with 3 TODOs to resolve topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198663#comment-14198663 ] Gwen Shapira commented on KAFKA-1667: - Can you load the patch to RB too? It makes reviewing and commenting much easier. topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198670#comment-14198670 ] Gwen Shapira commented on KAFKA-1667: - I think we can do better validations. For example: .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, atLeast(0), MEDIUM, ) Can be: .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0,1), MEDIUM, ) Since its a ratio. Once you have RB, I can add more detail for other configs. topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198678#comment-14198678 ] Jun Rao commented on KAFKA-1738: Yes, I started a kafka broker and ran your script. I was able to see local logs created. ls /tmp/kafka-logs/ juntopic-0 test-0 topic3-0topic_3-0 recovery-point-offset-checkpointtopic1-0 topic_1-0 topic_4-0 replication-offset-checkpoint topic2-0 topic_2-0 topic_5-0 The error you saw typically happens when a broker is down. Can you telnet to host DMIPVM on port 9092 when the broker is up? Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198688#comment-14198688 ] Pradeep commented on KAFKA-1738: Can you provide the configuration files? We are using the default configurations. Also, i tried telnet and the broker is up. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at
[jira] [Comment Edited] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198688#comment-14198688 ] Pradeep edited comment on KAFKA-1738 at 11/5/14 5:26 PM: - Can you provide the configuration files? We are using the default configurations and the script tries to create a topic every 10 mins. Also, i tried telnet and the broker is up. was (Author: pradeepbadiger): Can you provide the configuration files? We are using the default configurations. Also, i tried telnet and the broker is up. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0)
[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198699#comment-14198699 ] Joel Koshy commented on KAFKA-391: -- If there are three partitions, then there will be three message-sets. i.e., producerRequest.data.size will be three, not one. Can you give example _application_ code that reproduces the issue that you are seeing? Producer request and response classes should use maps - Key: KAFKA-391 URL: https://issues.apache.org/jira/browse/KAFKA-391 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Priority: Blocker Labels: optimization Fix For: 0.8.0 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch, KAFKA-391-v4.patch Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays. It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks). We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198713#comment-14198713 ] schandr commented on KAFKA-1738: Here are the additional logs for the same issue Controller.log [2014-11-05 10:31:12,441] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:localhost.localdomain,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-05 10:31:12,445] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 7 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:7;CorrelationId:8;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:localhost.localdomain,port:9092;PartitionState:[topic_30,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:7),ReplicationFactor:1),AllReplicas:0),[topic_5,0] - (LeaderAndIsrInfo:(Leader:-2,ISR:0,LeaderEpoch:0,ControllerEpoch:7),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:localhost.localdomain,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-05 10:31:12,448] INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to id:0,host:localhost.localdomain,port:9092 for sending state change requests (kafka.controller.RequestSendThread) Server.log [2014-11-05 10:31:12,414] DEBUG Got notification sessionid:0x14980b08c110003 (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,415] DEBUG Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/brokers/topics for sessionid 0x14980b08c110003 (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,415] DEBUG Received event: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/brokers/topics (org.I0Itec.zkclient.ZkClient) [2014-11-05 10:31:12,415] DEBUG New event: ZkEvent[Children of /brokers/topics changed sent to kafka.controller.PartitionStateMachine$TopicChangeListener@1fc5681] (org.I0Itec.zkclient.ZkEventThread) [2014-11-05 10:31:12,415] DEBUG Leaving process event (org.I0Itec.zkclient.ZkClient) [2014-11-05 10:31:12,415] DEBUG Delivering event #3 ZkEvent[Children of /brokers/topics changed sent to kafka.controller.PartitionStateMachine$TopicChangeListener@1fc5681] (org.I0Itec.zkclient.ZkEventThread) [2014-11-05 10:31:12,415] DEBUG Got ping response for sessionid: 0x14980b08c110003 after 0ms (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,416] DEBUG Reading reply sessionid:0x14980b08c110003, packet:: clientPath:null serverPath:null finished:false header:: 257,3 replyHeader:: 257,13610,0 request:: '/brokers/topics,T response:: s{6,6,1415130748279,1415130748279,0,23,0,0,0,23,13610} (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,417] DEBUG Reading reply sessionid:0x14980b08c110003, packet:: clientPath:null serverPath:null finished:false header:: 258,8 replyHeader:: 258,13610,0 request:: '/brokers/topics,T response:: v{'topic_23,'topic_18,'topic_22,'topic_17,'topic_25,'topic_16,'topic_24,'topic_15,'topic_14,'topic_13,'topic_12,'topic_11,'topic_19,'topic_5,'topic_7,'topic_6,'Test1,'topic_10,'topic_9,'topic_8,'topic_20,'topic_30,'topic_21} (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,420] DEBUG Reading reply sessionid:0x14980b08c110003, packet:: clientPath:null serverPath:null finished:false header:: 259,4 replyHeader:: 259,13610,0 request:: '/brokers/topics/topic_30,F response:: #7b2276657273696f6e223a312c22706172746974696f6e73223a7b2230223a5b305d7d7d,s{13610,13610,1415205072414,1415205072414,0,0,0,0,36,0,13610} (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,422] DEBUG Replicas assigned to topic [topic_30], partition [0] are [List(0)] (kafka.utils.ZkUtils$) [2014-11-05 10:31:12,422] DEBUG Replicas assigned to topic [topic_30], partition [0] are [List(0)] (kafka.utils.ZkUtils$) [2014-11-05 10:31:12,425] DEBUG Reading reply
[jira] [Comment Edited] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198713#comment-14198713 ] schandr edited comment on KAFKA-1738 at 11/5/14 5:37 PM: - Here are the additional logs for the same issue Controller.log [2014-11-05 10:31:12,441] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:localhost.localdomain,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-05 10:31:12,445] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 7 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:7;CorrelationId:8;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:localhost.localdomain,port:9092;PartitionState:[topic_30,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:7),ReplicationFactor:1),AllReplicas:0),[topic_5,0] - (LeaderAndIsrInfo:(Leader:-2,ISR:0,LeaderEpoch:0,ControllerEpoch:7),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:localhost.localdomain,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-05 10:31:12,448] INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to id:0,host:localhost.localdomain,port:9092 for sending state change requests (kafka.controller.RequestSendThread) Server.log [2014-11-05 10:31:12,414] DEBUG Got notification sessionid:0x14980b08c110003 (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,415] DEBUG Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/brokers/topics for sessionid 0x14980b08c110003 (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,415] DEBUG Received event: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/brokers/topics (org.I0Itec.zkclient.ZkClient) [2014-11-05 10:31:12,415] DEBUG New event: ZkEvent[Children of /brokers/topics changed sent to kafka.controller.PartitionStateMachine$TopicChangeListener@1fc5681] (org.I0Itec.zkclient.ZkEventThread) [2014-11-05 10:31:12,415] DEBUG Leaving process event (org.I0Itec.zkclient.ZkClient) [2014-11-05 10:31:12,415] DEBUG Delivering event #3 ZkEvent[Children of /brokers/topics changed sent to kafka.controller.PartitionStateMachine$TopicChangeListener@1fc5681] (org.I0Itec.zkclient.ZkEventThread) [2014-11-05 10:31:12,415] DEBUG Got ping response for sessionid: 0x14980b08c110003 after 0ms (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,416] DEBUG Reading reply sessionid:0x14980b08c110003, packet:: clientPath:null serverPath:null finished:false header:: 257,3 replyHeader:: 257,13610,0 request:: '/brokers/topics,T response:: s{6,6,1415130748279,1415130748279,0,23,0,0,0,23,13610} (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,417] DEBUG Reading reply sessionid:0x14980b08c110003, packet:: clientPath:null serverPath:null finished:false header:: 258,8 replyHeader:: 258,13610,0 request:: '/brokers/topics,T response:: v{'topic_23,'topic_18,'topic_22,'topic_17,'topic_25,'topic_16,'topic_24,'topic_15,'topic_14,'topic_13,'topic_12,'topic_11,'topic_19,'topic_5,'topic_7,'topic_6,'Test1,'topic_10,'topic_9,'topic_8,'topic_20,'topic_30,'topic_21} (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,420] DEBUG Reading reply sessionid:0x14980b08c110003, packet:: clientPath:null serverPath:null finished:false header:: 259,4 replyHeader:: 259,13610,0 request:: '/brokers/topics/topic_30,F response:: #7b2276657273696f6e223a312c22706172746974696f6e73223a7b2230223a5b305d7d7d,s{13610,13610,1415205072414,1415205072414,0,0,0,0,36,0,13610} (org.apache.zookeeper.ClientCnxn) [2014-11-05 10:31:12,422] DEBUG Replicas assigned to topic [topic_30], partition [0] are [List(0)] (kafka.utils.ZkUtils$) [2014-11-05 10:31:12,422] DEBUG Replicas assigned to topic [topic_30], partition [0] are [List(0)] (kafka.utils.ZkUtils$)
[jira] [Resolved] (KAFKA-1751) handle broker not exists scenario
[ https://issues.apache.org/jira/browse/KAFKA-1751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Pekar resolved KAFKA-1751. - Resolution: Fixed implemented handle broker not exists scenario --- Key: KAFKA-1751 URL: https://issues.apache.org/jira/browse/KAFKA-1751 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1751) handle broker not exists scenario
[ https://issues.apache.org/jira/browse/KAFKA-1751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Pekar updated KAFKA-1751: Attachment: kafka-1751.patch attached patch handle broker not exists scenario --- Key: KAFKA-1751 URL: https://issues.apache.org/jira/browse/KAFKA-1751 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: kafka-1751.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-1752) add --replace-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-1752 started by Dmitry Pekar. --- add --replace-broker option --- Key: KAFKA-1752 URL: https://issues.apache.org/jira/browse/KAFKA-1752 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198747#comment-14198747 ] schandr commented on KAFKA-1738: And I was able to telnet to the host:port. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at
[jira] [Updated] (KAFKA-1752) add --replace-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1752: - Reviewer: Neha Narkhede add --replace-broker option --- Key: KAFKA-1752 URL: https://issues.apache.org/jira/browse/KAFKA-1752 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198787#comment-14198787 ] Ewen Cheslack-Postava commented on KAFKA-1745: -- [~junrao] I think he was saying he has a fixed set of producers which are shared among a variable number of threads. He never calls producer.close(), but expects that if he has a pool of, e.g., 5 producers, that the number of KQUEUEs and PIPEs is constant, which makes sense. However, I tried reproducing this and couldn't. My simple test just allocated one producer and spun up threads, each of which sent a message via that producer. lsof showed the KQUEUEs and PIPEs remaining constant. This is exactly what I'd expect since these should only be allocated on the producer's networking thread. Vishal, is there any chance you're leaking producers from the pool by accident? Can you generate a small test case that reproduces the issue? Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established from the producer system to the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27634: Patch for KAFKA-1667
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/ --- Review request for kafka. Bugs: KAFKA-1667 https://issues.apache.org/jira/browse/KAFKA-1667 Repository: kafka Description --- KAFKA-1667 Added topic level config validation Diffs - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java c4cea2cc072f4db4ce014b63d226431d3766bef1 core/src/main/scala/kafka/admin/TopicCommand.scala 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 core/src/main/scala/kafka/log/LogConfig.scala e48922a97727dd0b98f3ae630ebb0af3bef2373d Diff: https://reviews.apache.org/r/27634/diff/ Testing --- Thanks, Dmytro Kostiuchenko
Re: Review Request 27634: Patch for KAFKA-1667
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/ --- (Updated Nov. 5, 2014, 6:42 p.m.) Review request for kafka. Bugs: KAFKA-1667 https://issues.apache.org/jira/browse/KAFKA-1667 Repository: kafka Description --- KAFKA-1667 Added topic level config validation Diffs - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java c4cea2cc072f4db4ce014b63d226431d3766bef1 core/src/main/scala/kafka/admin/TopicCommand.scala 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 core/src/main/scala/kafka/log/LogConfig.scala e48922a97727dd0b98f3ae630ebb0af3bef2373d Diff: https://reviews.apache.org/r/27634/diff/ Testing --- Thanks, Dmytro Kostiuchenko
Re: Review Request 27634: Patch for KAFKA-1667
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/ --- (Updated Nov. 5, 2014, 6:44 p.m.) Review request for kafka. Bugs: KAFKA-1667 https://issues.apache.org/jira/browse/KAFKA-1667 Repository: kafka Description --- KAFKA-1667 Added topic level config validation Diffs (updated) - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java c4cea2cc072f4db4ce014b63d226431d3766bef1 core/src/main/scala/kafka/admin/TopicCommand.scala 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 core/src/main/scala/kafka/log/LogConfig.scala e48922a97727dd0b98f3ae630ebb0af3bef2373d Diff: https://reviews.apache.org/r/27634/diff/ Testing --- Thanks, Dmytro Kostiuchenko
[jira] [Updated] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmytro Kostiuchenko updated KAFKA-1667: --- Attachment: KAFKA-1667_2014-11-05_19:43:53.patch topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch, KAFKA-1667_2014-11-05_19:43:53.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198829#comment-14198829 ] Dmytro Kostiuchenko commented on KAFKA-1667: Updated reviewboard https://reviews.apache.org/r/27634/diff/ against branch origin/trunk topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch, KAFKA-1667_2014-11-05_19:43:53.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27634: Patch for KAFKA-1667
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/#review59997 --- I like how you added validations while preserving the API to minimize code changes. However, looks like not all unit tests pass. For example, DynamicConfigChangeTest is failing. Can you run the tests (./gradlew cleanTest test) and make sure they all pass? Also, I recommend adding extra tests to test the validation. core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/27634/#comment101339 I think the validations look good, so you can remove the TODO core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/27634/#comment101337 I'd copy-paste the docs for each property either from the javadoc for the class or from the documentation and add them. - Gwen Shapira On Nov. 5, 2014, 6:44 p.m., Dmytro Kostiuchenko wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/ --- (Updated Nov. 5, 2014, 6:44 p.m.) Review request for kafka. Bugs: KAFKA-1667 https://issues.apache.org/jira/browse/KAFKA-1667 Repository: kafka Description --- KAFKA-1667 Added topic level config validation Diffs - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java c4cea2cc072f4db4ce014b63d226431d3766bef1 core/src/main/scala/kafka/admin/TopicCommand.scala 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 core/src/main/scala/kafka/log/LogConfig.scala e48922a97727dd0b98f3ae630ebb0af3bef2373d Diff: https://reviews.apache.org/r/27634/diff/ Testing --- Thanks, Dmytro Kostiuchenko
[jira] [Updated] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] schandr updated KAFKA-1738: --- Attachment: ServerLogForSuccessfulTopicCreation.txt ServerLogForFailedTopicCreation.txt Please see the attached serverlog excerpts for a failed and successful topic creation. For a failed Topic creation the log files are not getting created under the kafka.log.dirs folder. Looking at the successful topic creation server log, I can see the trace logs for the partition log creation, which is missing from the failed server log. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] -
[jira] [Updated] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] schandr updated KAFKA-1738: --- Attachment: ServerLogForFailedTopicCreation.txt Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at
[jira] [Commented] (KAFKA-1753) add --decommission-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199035#comment-14199035 ] Dmitry Pekar commented on KAFKA-1753: - Proposal for implementation is following: add --remove-broker CLI option. Value of option is broker id. Example --remove-broker 1 means to remove broker[id=1] from cluster. Please, negotiate. add --decommission-broker option Key: KAFKA-1753 URL: https://issues.apache.org/jira/browse/KAFKA-1753 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1752) add --replace-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199032#comment-14199032 ] Dmitry Pekar commented on KAFKA-1752: - Proposal for implementation is following: add --replace-broker CLI option. Value of option is comma-separated pair of broker ids (src,dst). Example: --replace-broker 1,2 means replace broker[id=1] with broker[id=2] in cluster. Please, negotiate. add --replace-broker option --- Key: KAFKA-1752 URL: https://issues.apache.org/jira/browse/KAFKA-1752 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199087#comment-14199087 ] Michael Herstine commented on KAFKA-1684: - Hi Ivan, Thanks-- adding SSL support is complex, and the JSSE isn't always helpful. Just started reading the patch (class SSLSocketChannel in particular) had a few questions: 1. I think we should be validating the peer hostname at some point. JSSE doesn't do this for us: When using raw SSLSockets/SSLEngines you should always check the peer's credentials before sending any data. The SSLSocket and SSLEngine classes do not automatically verify that the hostname in a URL matches the hostname in the peer's credentials. An application could be exploited with URL spoofing if the hostname is not verified. http://docs.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html 2. What are your thoughts on handling certificate revocation? 3. I notice that when creating both client server SSLSocketChannel instances, you set the supported protocol to SSLv3-- can I ask why that choice (and not, say, the latest version)? Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Attachments: KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199132#comment-14199132 ] schandr commented on KAFKA-1738: OkHere is my understanding. This might probably be a bug. 1. For any requests that the controller sends to the Broker, it uses the BlockingChannel - that's initialized with the controller.socket.timeout.ms value specified in the server.properties. 2. Once the channel is established the RequestSendThread uses this channel to send any requests such as LeaderAndIsr, UpdateMetaData without checking if the channel is still open 3. Based on the value, the socket might have timed out. The following Code in the RequestSendThread calls the connectToBroker again on catching the exception, but does not send the failed request. If the request happens to be LeaderAndIsr for the new partition it results in missing log directory creation or other errors which results in producer or consumer throwing exceptions when they try to produce or consume data from the failed topic. var isSendSuccessful = false while(isRunning.get() !isSendSuccessful) { // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { channel.send(request) isSendSuccessful = true } catch { case e: Throwable = // if the send was not successful, reconnect to broker and resend the message error((Controller %d epoch %d failed to send request %s to broker %s. + Reconnecting to broker.).format(controllerId, controllerContext.epoch, request.toString, toBroker.toString()), e) channel.disconnect() connectToBroker(toBroker, channel) isSendSuccessful = false // backoff before retrying the connection and send Utils.swallow(Thread.sleep(300)) } } In the code below, after reconnecting to the broker, it should also resend the failed request. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)]
[jira] [Comment Edited] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199132#comment-14199132 ] schandr edited comment on KAFKA-1738 at 11/5/14 9:57 PM: - OkHere is my understanding. This might probably be a bug. 1. For any requests that the controller sends to the Broker, it uses the BlockingChannel - that's initialized with the controller.socket.timeout.ms value specified in the server.properties. 2. Once the channel is established the RequestSendThread uses this channel to send any requests such as LeaderAndIsr, UpdateMetaData without checking if the channel is still open 3. Based on the value, the socket might have timed out. The following Code in the RequestSendThread calls the connectToBroker again on catching the exception, but does not send the failed request. If the request happens to be LeaderAndIsr for the new partition it results in missing log directory creation or other errors which results in producer or consumer throwing exceptions when they try to produce or consume data from the failed topic. var isSendSuccessful = false while(isRunning.get() !isSendSuccessful) { // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { channel.send(request) isSendSuccessful = true } catch { case e: Throwable = // if the send was not successful, reconnect to broker and resend the message error((Controller %d epoch %d failed to send request %s to broker %s. + Reconnecting to broker.).format(controllerId, controllerContext.epoch, request.toString, toBroker.toString()), e) channel.disconnect() connectToBroker(toBroker, channel) isSendSuccessful = false // backoff before retrying the connection and send Utils.swallow(Thread.sleep(300)) } } In the code above, after reconnecting to the broker, it should also resend the failed request. Atleast the inline comment says so. -- // if the send was not successful, reconnect to broker and resend the message. So the code in the catch block should be catch { case e: Throwable = // if the send was not successful, reconnect to broker and resend the message error((Controller %d epoch %d failed to send request %s to broker %s. + Reconnecting to broker.).format(controllerId, controllerContext.epoch, request.toString, toBroker.toString()), e) channel.disconnect() connectToBroker(toBroker, channel) channel.send(request) isSendSuccessful = false // backoff before retrying the connection and send Utils.swallow(Thread.sleep(300)) } was (Author: schandr): OkHere is my understanding. This might probably be a bug. 1. For any requests that the controller sends to the Broker, it uses the BlockingChannel - that's initialized with the controller.socket.timeout.ms value specified in the server.properties. 2. Once the channel is established the RequestSendThread uses this channel to send any requests such as LeaderAndIsr, UpdateMetaData without checking if the channel is still open 3. Based on the value, the socket might have timed out. The following Code in the RequestSendThread calls the connectToBroker again on catching the exception, but does not send the failed request. If the request happens to be LeaderAndIsr for the new partition it results in missing log directory creation or other errors which results in producer or consumer throwing exceptions when they try to produce or consume data from the failed topic. var isSendSuccessful = false while(isRunning.get() !isSendSuccessful) { // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { channel.send(request) isSendSuccessful = true } catch { case e: Throwable = // if the send was not successful, reconnect to broker and resend the message error((Controller %d epoch %d failed to send request %s to broker %s. + Reconnecting to broker.).format(controllerId, controllerContext.epoch, request.toString, toBroker.toString()), e) channel.disconnect() connectToBroker(toBroker, channel) isSendSuccessful = false // backoff before retrying the connection and send
Re: Review Request 27534: Patch for KAFKA-1746
On Nov. 5, 2014, 1 a.m., Guozhang Wang wrote: Where will testcaseEnv.validationStatusDict[Test completed] be used? Ewen Cheslack-Postava wrote: That's where all the validation results (the test's assertions) are stored. It gets scanned through at the end of the test run in system_test_runner.py to generate the report and count the number of failures. The setup is a bit confusing because it's named by testcaseEnv.validationStatusDict and testcaseEnv.testcaseResultsDict[validation_status] (see TestcaseEnv's constructor). I see. Thanks. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27534/#review59898 --- On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27534/ --- (Updated Nov. 3, 2014, 7:46 p.m.) Review request for kafka. Bugs: KAFKA-1746 https://issues.apache.org/jira/browse/KAFKA-1746 Repository: kafka Description --- KAFKA-1746 Make system tests return a useful exit code. KAFKA-1746 Check the exit code when running DumpLogSegments to verify data. Diffs - system_test/mirror_maker_testsuite/mirror_maker_test.py c0117c64cbb7687ca8fbcec6b5c188eb880300ef system_test/offset_management_testsuite/offset_management_test.py 12b5cd25140e1eb407dd57eef63d9783257688b2 system_test/replication_testsuite/replica_basic_test.py 660006cc253bbae3e7cd9f02601f1c1937dd1714 system_test/system_test_runner.py ee7aa252333553e8eb0bc046edf968ec99dddb70 system_test/utils/kafka_system_test_utils.py 1093b660ebd0cb5ab6d3731d26f151e1bf717f8a Diff: https://reviews.apache.org/r/27534/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: [Java New Producer] Changing Partition number and its Impact
Hi Jay or Kafka Dev Team, Any suggestions, how I can deal with this situation of expanding partitions for New Java Producer for scalability (consumer side) ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 7:08 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Also, to added to this Old producer (Scala based in not impacted by the partition changes). So it is important scalability feature being taken way if you do not plan for expansion from the beginning for New Java Producer. So, New Java Producer is taking way this critical feature (unless plan). Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:56 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Fundamental, problem is batch size is already configured and producers are running in production with given configuration. ( Previous value were just sample). How do we increase partitions for topics when batch size exceed and configured buffer limit ? Yes, had we planed for batch size smaller we can do this, but we cannot do this if producers are already running. Have you faced this problem at LinkedIn or any other place ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:25 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, No there isn't such a setting. But what I am saying is that I don't think you really need that feature. I think instead you can use a 32k batch size with your 64M memory limit. This should mean you can have up up to 2048 batches in flight. Assuming one batch in flight and one being added to at any given time, then this should work well for up to ~1000 partitions. So rather than trying to do anything dynamic. So assuming each producer sends to just one topic then you would be fine as long as that topic had fewer than 1000 partitions. If you wanted to add more you would need to add memory on producers. -Jay On Tue, Nov 4, 2014 at 4:04 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, I agree and understood what you have mentioned in previous email. But when you have 5000+ producers running in cloud ( I am sure linkedin has many more and need to increase partitions for scalability) then all running producer will not send any data. So Is there any feature or setting that make sense to shrink batch size to fit the increase. I am sure other will face the same issue. Had I configured with block.on.buffer.full=true it will be even worse and will block application threads. Our use case is *logger.log(msg)* method can not be blocked so that is why we have configuration to false. So I am sure others will run into this same issues. Try to find the optimal solution and recommendation from Kafka Dev team for this particular use case (which may become common). Thanks, Bhavesh On Tue, Nov 4, 2014 at 3:12 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, Here is what your configuration means buffer.memory=64MB # This means don't use more than 64MB of memory batch.size=1MB # This means allocate a 1MB buffer for each partition with data block.on.buffer.full=false # This means immediately throw an exception if there is not enough memory to create a new buffer Not sure what linger time you have set. So what you see makes sense. If you have 1MB buffers and 32 partitions then you will have approximately 32MB of memory in use (actually a bit more than this since one buffer will be filling while another is sending). If you have 128 partitions then you will try to use 128MB, and since you have configured the producer to fail when you reach 64 (rather than waiting for memory to become available) that is what happens. I suspect if you want a smaller batch size. More than 64k is usually not going to help throughput. -Jay On Tue, Nov 4, 2014 at 11:39 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev, With new Producer, we are having to change the # partitions for a topic, and we face this issue BufferExhaustedException. Here is example, we have set 64MiB and 32 partitions and 1MiB of batch size. But when we increase the partition to 128, it throws BufferExhaustedException right way (non key based message). Buffer is allocated based on batch.size. This is very common need to set auto calculate batch size when partitions increase because we have about ~5000 boxes and it is not practical to deploy code in all machines than expand partition for scalability purpose. What are options available while new producer is running and partition needs to increase and not enough buffer to allocate batch size for additional partition ? buffer.memory=64MiB batch.size=1MiB block.on.buffer.full=false Thanks, Bhavesh
[jira] [Created] (KAFKA-1756) never allow the replica fetch size to be less than the max message size
Joe Stein created KAFKA-1756: Summary: never allow the replica fetch size to be less than the max message size Key: KAFKA-1756 URL: https://issues.apache.org/jira/browse/KAFKA-1756 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1, 0.8.2 Reporter: Joe Stein Priority: Blocker Fix For: 0.8.2 There exists a very hazardous scenario where if the max.message.bytes is greather than the replica.fetch.max.bytes the message will never replicate. This will bring the ISR down to 1 (eventually/quickly once replica.lag.max.messages is reached). If during this window the leader itself goes out of the ISR then the new leader will commit the last offset it replicated. This is also bad for sync producers with -1 ack because they will all block (heard affect caused upstream) in this scenario too. The fix here is two fold 1) when setting max.message.bytes using kafka-topics we must check first each and every broker (which will need some thought about how todo this because of the topiccommand zk notification) that max.message.bytes = replica.fetch.max.bytes and if it is NOT then DO NOT create the topic 2) if you change this in server.properties then the broker should not start if max.message.bytes replica.fetch.max.bytes This does beg the question/issue some about centralizing certain/some/all configurations so that inconsistencies do not occur (where broker 1 has max.message.bytes replica.fetch.max.bytes but broker 2 max.message.bytes = replica.fetch.max.bytes because of error in properties). I do not want to conflate this ticket but I think it is worth mentioning/bringing up here as it is a good example where it could make sense. I set this as BLOCKER for 0.8.2-beta because we did so much work to enable consistency vs availability and 0 data loss this corner case should be part of 0.8.2-final Also, I could go one step further (though I would not consider this part as a blocker for 0.8.2 but interested to what other folks think) about a consumer replica fetch size so that if the message max is increased messages will no longer be consumed (since the consumer fetch max would be max.message.bytes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Java New Producer] Changing Partition number and its Impact
Bhavesh, Wouldn't using the default batch size of 16k have avoided this problem entirely? I think the best solution now is just to change the configuration. What I am saying is it is unlikely you will need to do this again, the problem is just that 1MB partition batches are quite large so you quickly run out of memory very quickly with that configuration. I agree that the scala producer doesn't have this problem, but it actually doesn't really let you control the memory use or the request size very effectively which I would argue is a much bigger problem. Once you introduce those controls you have to configure how to make use of them, which is what this is about. -Jay On Wed, Nov 5, 2014 at 3:45 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay or Kafka Dev Team, Any suggestions, how I can deal with this situation of expanding partitions for New Java Producer for scalability (consumer side) ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 7:08 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Also, to added to this Old producer (Scala based in not impacted by the partition changes). So it is important scalability feature being taken way if you do not plan for expansion from the beginning for New Java Producer. So, New Java Producer is taking way this critical feature (unless plan). Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:56 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Fundamental, problem is batch size is already configured and producers are running in production with given configuration. ( Previous value were just sample). How do we increase partitions for topics when batch size exceed and configured buffer limit ? Yes, had we planed for batch size smaller we can do this, but we cannot do this if producers are already running. Have you faced this problem at LinkedIn or any other place ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:25 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, No there isn't such a setting. But what I am saying is that I don't think you really need that feature. I think instead you can use a 32k batch size with your 64M memory limit. This should mean you can have up up to 2048 batches in flight. Assuming one batch in flight and one being added to at any given time, then this should work well for up to ~1000 partitions. So rather than trying to do anything dynamic. So assuming each producer sends to just one topic then you would be fine as long as that topic had fewer than 1000 partitions. If you wanted to add more you would need to add memory on producers. -Jay On Tue, Nov 4, 2014 at 4:04 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, I agree and understood what you have mentioned in previous email. But when you have 5000+ producers running in cloud ( I am sure linkedin has many more and need to increase partitions for scalability) then all running producer will not send any data. So Is there any feature or setting that make sense to shrink batch size to fit the increase. I am sure other will face the same issue. Had I configured with block.on.buffer.full=true it will be even worse and will block application threads. Our use case is *logger.log(msg)* method can not be blocked so that is why we have configuration to false. So I am sure others will run into this same issues. Try to find the optimal solution and recommendation from Kafka Dev team for this particular use case (which may become common). Thanks, Bhavesh On Tue, Nov 4, 2014 at 3:12 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, Here is what your configuration means buffer.memory=64MB # This means don't use more than 64MB of memory batch.size=1MB # This means allocate a 1MB buffer for each partition with data block.on.buffer.full=false # This means immediately throw an exception if there is not enough memory to create a new buffer Not sure what linger time you have set. So what you see makes sense. If you have 1MB buffers and 32 partitions then you will have approximately 32MB of memory in use (actually a bit more than this since one buffer will be filling while another is sending). If you have 128 partitions then you will try to use 128MB, and since you have configured the producer to fail when you reach 64 (rather than waiting for memory to become available) that is what happens. I suspect if you want a smaller batch size. More than 64k is usually not going to help throughput. -Jay On Tue, Nov 4, 2014 at 11:39 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev, With new Producer, we are having to change the # partitions for a topic, and we face this issue BufferExhaustedException.
[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199501#comment-14199501 ] Jun Rao commented on KAFKA-1684: Ivan, Thanks for the patch. Some high level comments. 1. SocketOption is only in java 7 and beyond. Do we want to stop supporting java 6? 2. SSLConnectionConfig is hard-coded to load from a config file. Since the Kafka broker may be started from a container which may have its own way of dealing with configuration, we should pass in the SSL specific configs through the constructor of KafkaServer. It seems that the easiest way is to add those SSL configs to KafkaConfig directly. 3. You added a new ZK path to register the broker secure port. I am wondering if it's simpler to just add that info to the value of the existing broker path in ZK. 4. You added a new request type TopicMetadataForChannelRequest. Again, I am wondering if it's simpler to just add the secure port in the existing TopicMetadataResponse, instead of creating a new type of request. 5. You evolved the request format of UpdateMetadataRequest. For backward compatibility, we need to bump up the version number and make sure that the receiver can parse both versions of the request. 6. We need to think through how to do rolling upgrades with this change. This is a bit tricky since the broker registry in ZK is read by the controller, which can be of any broker. So, if a broker is registered in the new format in ZK and then the controller tries to read it, but doesn't understand its format (since the controller broker hasn't been upgraded), then things will break. Similarly, UpdateMetadataRequest is used for communication btw brokers, if a broker receives a request but doesn't understand its format, bad things will happen. One way to do this kind of upgrade is to do this in two steps. In step one, we upgrade the software for each broker, but disallow the broker to use the new ZK or request format. In step two, we make a config change to every broker to enable the brokers to start using the new format. 7. Do we plan to use the same secure port for either SSL or SASL? If so, this port should probably be named in a more general way. 8. Which port should the follower use? If both the plaintext and the secure port are configured, should we just pick one (e.g., secure) or make it configurable? 9. What's the plan for unit testing this feature? Some low level comments. 10. SSLSocketChannel: Some of the logic in this class is a bit hard to follow. 10.1 It's not clear to me what values initialized can have and what those values mean. 10.2 It's not very clear to me why we need to write to a socket (through writeIfReadyAndNeeded()) while reading from a socket (when initialized != -1). 10.3 Could you explain a bit what isExtraReadRequired() and Processor.readExtraIfNeeded() do? 10.4 We will need a non-blocking version of connect(), for the new java clients. 10.5 It's a bit weird to simulate blocking read using a selector. Should we just do the read directly since the socket has already been properly configured with blocking or not? It would probably be useful to first document the config/request/ZK changes in a wiki and also outline the upgrade path. Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Attachments: KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1753) add --decommission-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199524#comment-14199524 ] Gwen Shapira commented on KAFKA-1753: - +1 Great idea IMO. Will make life easier for a pretty common use-case. I like the --decomission-broker name better than --remove-broker (just because it matches the Hadoop terminology) add --decommission-broker option Key: KAFKA-1753 URL: https://issues.apache.org/jira/browse/KAFKA-1753 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1753) add --decommission-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199564#comment-14199564 ] Joe Stein commented on KAFKA-1753: -- +1 naming the option --decomission-broker add --decommission-broker option Key: KAFKA-1753 URL: https://issues.apache.org/jira/browse/KAFKA-1753 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
No longer supporting Java 6, if? when?
This has been coming up in a lot of projects and for other reasons too I wanted to kick off the discussion about if/when we end support for Java 6. Besides any API we may want to use in = 7 we also compile our binaries for 6 for release currently. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Commented] (KAFKA-1752) add --replace-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199595#comment-14199595 ] Gwen Shapira commented on KAFKA-1752: - +1 to having this feature. Maybe a more readable option will be: --replace-broker 1 --with-broker 2 add --replace-broker option --- Key: KAFKA-1752 URL: https://issues.apache.org/jira/browse/KAFKA-1752 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1752) add --replace-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199595#comment-14199595 ] Gwen Shapira edited comment on KAFKA-1752 at 11/6/14 1:46 AM: -- +1 to having this feature. Maybe a more readable option will be: --replace-broker 1 --with-broker 2 (not a committer, so my opinion is not binding) was (Author: gwenshap): +1 to having this feature. Maybe a more readable option will be: --replace-broker 1 --with-broker 2 add --replace-broker option --- Key: KAFKA-1752 URL: https://issues.apache.org/jira/browse/KAFKA-1752 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1751) handle broker not exists scenario
[ https://issues.apache.org/jira/browse/KAFKA-1751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199600#comment-14199600 ] Jun Rao commented on KAFKA-1751: Could you add a bit more description of what the problem is? Also, is this really resolved? handle broker not exists scenario --- Key: KAFKA-1751 URL: https://issues.apache.org/jira/browse/KAFKA-1751 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: kafka-1751.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1729: -- Attachment: KAFKA-1782-doc-v2.patch add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Attachments: KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Java New Producer] Changing Partition number and its Impact
HI Jay, Thanks for response. I feel this needs to be documented as limitation of New Java Producer Batch size vs buffer size impact when increasing the partition. I agree what you get fine grain control (which is great), but ultimately loosing the functionality of increasing partition for scalability which I think is greater without impacting running live production environment producers. I would argue from customer prospective that I want to have flag called *auto.update.batch.size* respect to buffer size for new producer which will recalculate batch size when partition increase is detected. So the running code does not throw this exception or block application threads for more memory which it will never get. (this is just my suggestion) Do you agree ? Should I file a Jira for this ? I am sure others will run into this problem for sure sooner or later. Thanks, Bhavesh On Wed, Nov 5, 2014 at 4:44 PM, Jay Kreps jay.kr...@gmail.com wrote: Bhavesh, Wouldn't using the default batch size of 16k have avoided this problem entirely? I think the best solution now is just to change the configuration. What I am saying is it is unlikely you will need to do this again, the problem is just that 1MB partition batches are quite large so you quickly run out of memory very quickly with that configuration. I agree that the scala producer doesn't have this problem, but it actually doesn't really let you control the memory use or the request size very effectively which I would argue is a much bigger problem. Once you introduce those controls you have to configure how to make use of them, which is what this is about. -Jay On Wed, Nov 5, 2014 at 3:45 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay or Kafka Dev Team, Any suggestions, how I can deal with this situation of expanding partitions for New Java Producer for scalability (consumer side) ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 7:08 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Also, to added to this Old producer (Scala based in not impacted by the partition changes). So it is important scalability feature being taken way if you do not plan for expansion from the beginning for New Java Producer. So, New Java Producer is taking way this critical feature (unless plan). Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:56 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Jay, Fundamental, problem is batch size is already configured and producers are running in production with given configuration. ( Previous value were just sample). How do we increase partitions for topics when batch size exceed and configured buffer limit ? Yes, had we planed for batch size smaller we can do this, but we cannot do this if producers are already running. Have you faced this problem at LinkedIn or any other place ? Thanks, Bhavesh On Tue, Nov 4, 2014 at 4:25 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, No there isn't such a setting. But what I am saying is that I don't think you really need that feature. I think instead you can use a 32k batch size with your 64M memory limit. This should mean you can have up up to 2048 batches in flight. Assuming one batch in flight and one being added to at any given time, then this should work well for up to ~1000 partitions. So rather than trying to do anything dynamic. So assuming each producer sends to just one topic then you would be fine as long as that topic had fewer than 1000 partitions. If you wanted to add more you would need to add memory on producers. -Jay On Tue, Nov 4, 2014 at 4:04 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, I agree and understood what you have mentioned in previous email. But when you have 5000+ producers running in cloud ( I am sure linkedin has many more and need to increase partitions for scalability) then all running producer will not send any data. So Is there any feature or setting that make sense to shrink batch size to fit the increase. I am sure other will face the same issue. Had I configured with block.on.buffer.full=true it will be even worse and will block application threads. Our use case is *logger.log(msg)* method can not be blocked so that is why we have configuration to false. So I am sure others will run into this same issues. Try to find the optimal solution and recommendation from Kafka Dev team for this particular use case (which may become common). Thanks, Bhavesh On Tue, Nov 4, 2014 at 3:12 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, Here is what your configuration means buffer.memory=64MB # This means don't use more than 64MB of memory batch.size=1MB # This means
[jira] [Updated] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1729: -- Status: Patch Available (was: In Progress) add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Attachments: KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199616#comment-14199616 ] Joel Koshy commented on KAFKA-1729: --- Incorporated comments. I would actually like to get rid of most of that wiki page I added (that is referenced from the doc) and recommend that people use the new OffsetClient utility that is being added in KAFKA-1013. I think it would be good to incorporate that into 0.8.2. I will review that patch again. add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Attachments: KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1754) KOYA - Kafka on YARN
[ https://issues.apache.org/jira/browse/KAFKA-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199617#comment-14199617 ] Gwen Shapira commented on KAFKA-1754: - Looking forward to see the patch. Streaming applications such as Samza, SparkStreaming and DataTorrents will benefit from running their workers on the same nodes as the partitions they are consuming data from. This is now possible in YARN. My main concern is whether its possible to prevent YARN from spawning new container when a broker goes down. Because starting the broker on a new node has huge overhead of replicating all the data over. We may prefer that this will not happen automatically, but only after someone verified that the original broker is truly gone. KOYA - Kafka on YARN Key: KAFKA-1754 URL: https://issues.apache.org/jira/browse/KAFKA-1754 Project: Kafka Issue Type: New Feature Reporter: Thomas Weise Attachments: DT-KOYA-Proposal- JIRA.pdf YARN (Hadoop 2.x) has enabled clusters to be used for a variety of workloads, emerging as distributed operating system for big data applications. Initiatives are on the way to bring long running services under the YARN umbrella, leveraging it for centralized resource management and operations ([YARN-896] and examples such as HBase, Accumulo or Memcached through Slider). This JIRA is to propose KOYA (Kafka On Yarn), a YARN application master to launch and manage Kafka clusters running on YARN. Brokers will use resources allocated through YARN with support for recovery, monitoring etc. Please see attached for more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199625#comment-14199625 ] Jun Rao commented on KAFKA-1738: Hmm, that try/catch block is in a while loop. So, the resend should happen when the logic loops back again. Also, normally when a broker goes down, the controller typically notices it immediately and will remove the corresponding BlockingChannel. When a broker comes back, a new BlockingChannel will be created. It sounds like that you see this issue in some broker failure scenarios. Could you describe that a bit more? Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199643#comment-14199643 ] schandr commented on KAFKA-1738: Thank you very much for your response. First I think the broker is not down. Because I am able to telnet to the VM where kafka is running using the host:9092. We have tried several scenarios 1. Creating a topic through Java code using TopicCommand 2. Creating a topic in a bash script. In either of the cases above, the topic creation fails randomly. When the bash script is modified to create the topic every 10 minutes, it fails consistently. In the server log, I see the request to LeaderAndISR fails, that results in partition log file not getting created. Only zookeeper and Kafka is running in the VM where the script was ran. Let me know if you need more information like server.properties file. Also I am able to see the zk nodes for the controller and broker through an eclipse zookeeper plugin Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at
[jira] [Comment Edited] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199643#comment-14199643 ] schandr edited comment on KAFKA-1738 at 11/6/14 2:26 AM: - Thank you very much for your response. First I think the broker is not down. Because I am able to telnet to the VM where kafka is running using the host:9092. We have tried several scenarios 1. Creating a topic through Java code using TopicCommand 2. Creating a topic in a bash script. In either of the cases above, the topic creation fails randomly. When the bash script is modified to create the topic every 10 minutes, it fails consistently. But if the same bash script is modified to create topic every 5 minutes, then the alternate topic creation goes through. Somehow the 10 minute interval is failing consistently. In the server log, I see the request to LeaderAndISR fails, that results in partition log file not getting created. Only zookeeper and Kafka is running in the VM where the script was ran. Let me know if you need more information like server.properties file. Also I am able to see the zk nodes for the controller and broker through an eclipse zookeeper plugin was (Author: schandr): Thank you very much for your response. First I think the broker is not down. Because I am able to telnet to the VM where kafka is running using the host:9092. We have tried several scenarios 1. Creating a topic through Java code using TopicCommand 2. Creating a topic in a bash script. In either of the cases above, the topic creation fails randomly. When the bash script is modified to create the topic every 10 minutes, it fails consistently. In the server log, I see the request to LeaderAndISR fails, that results in partition log file not getting created. Only zookeeper and Kafka is running in the VM where the script was ran. Let me know if you need more information like server.properties file. Also I am able to see the zk nodes for the controller and broker through an eclipse zookeeper plugin Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing
[jira] [Commented] (KAFKA-1756) never allow the replica fetch size to be less than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-1756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199656#comment-14199656 ] Jun Rao commented on KAFKA-1756: I am also wondering if we need to make max.message.bytes a topic level config. This makes replica fetchers as well as tools like MirrorMaker harder to configure since they have to be aware of all per topic level values. I am not sure if there is a strong use case to customize a different max message size per topic. never allow the replica fetch size to be less than the max message size --- Key: KAFKA-1756 URL: https://issues.apache.org/jira/browse/KAFKA-1756 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1, 0.8.2 Reporter: Joe Stein Priority: Blocker Fix For: 0.8.2 There exists a very hazardous scenario where if the max.message.bytes is greather than the replica.fetch.max.bytes the message will never replicate. This will bring the ISR down to 1 (eventually/quickly once replica.lag.max.messages is reached). If during this window the leader itself goes out of the ISR then the new leader will commit the last offset it replicated. This is also bad for sync producers with -1 ack because they will all block (heard affect caused upstream) in this scenario too. The fix here is two fold 1) when setting max.message.bytes using kafka-topics we must check first each and every broker (which will need some thought about how todo this because of the topiccommand zk notification) that max.message.bytes = replica.fetch.max.bytes and if it is NOT then DO NOT create the topic 2) if you change this in server.properties then the broker should not start if max.message.bytes replica.fetch.max.bytes This does beg the question/issue some about centralizing certain/some/all configurations so that inconsistencies do not occur (where broker 1 has max.message.bytes replica.fetch.max.bytes but broker 2 max.message.bytes = replica.fetch.max.bytes because of error in properties). I do not want to conflate this ticket but I think it is worth mentioning/bringing up here as it is a good example where it could make sense. I set this as BLOCKER for 0.8.2-beta because we did so much work to enable consistency vs availability and 0 data loss this corner case should be part of 0.8.2-final Also, I could go one step further (though I would not consider this part as a blocker for 0.8.2 but interested to what other folks think) about a consumer replica fetch size so that if the message max is increased messages will no longer be consumed (since the consumer fetch max would be max.message.bytes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199662#comment-14199662 ] Jun Rao commented on KAFKA-1738: What's your value for controller.socket.timeout.ms? Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
[jira] [Commented] (KAFKA-1753) add --decommission-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199670#comment-14199670 ] Jun Rao commented on KAFKA-1753: Is the idea just moving all data off the decommissioned topic using the partition reassignment tool under the cover? add --decommission-broker option Key: KAFKA-1753 URL: https://issues.apache.org/jira/browse/KAFKA-1753 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1753) add --decommission-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199675#comment-14199675 ] Joe Stein commented on KAFKA-1753: -- yes add --decommission-broker option Key: KAFKA-1753 URL: https://issues.apache.org/jira/browse/KAFKA-1753 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1756) never allow the replica fetch size to be less than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-1756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199674#comment-14199674 ] Gwen Shapira commented on KAFKA-1756: - Being able to modify max.message.size without bouncing the entire cluster is the main benefit of the topic-level config. never allow the replica fetch size to be less than the max message size --- Key: KAFKA-1756 URL: https://issues.apache.org/jira/browse/KAFKA-1756 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1, 0.8.2 Reporter: Joe Stein Priority: Blocker Fix For: 0.8.2 There exists a very hazardous scenario where if the max.message.bytes is greather than the replica.fetch.max.bytes the message will never replicate. This will bring the ISR down to 1 (eventually/quickly once replica.lag.max.messages is reached). If during this window the leader itself goes out of the ISR then the new leader will commit the last offset it replicated. This is also bad for sync producers with -1 ack because they will all block (heard affect caused upstream) in this scenario too. The fix here is two fold 1) when setting max.message.bytes using kafka-topics we must check first each and every broker (which will need some thought about how todo this because of the topiccommand zk notification) that max.message.bytes = replica.fetch.max.bytes and if it is NOT then DO NOT create the topic 2) if you change this in server.properties then the broker should not start if max.message.bytes replica.fetch.max.bytes This does beg the question/issue some about centralizing certain/some/all configurations so that inconsistencies do not occur (where broker 1 has max.message.bytes replica.fetch.max.bytes but broker 2 max.message.bytes = replica.fetch.max.bytes because of error in properties). I do not want to conflate this ticket but I think it is worth mentioning/bringing up here as it is a good example where it could make sense. I set this as BLOCKER for 0.8.2-beta because we did so much work to enable consistency vs availability and 0 data loss this corner case should be part of 0.8.2-final Also, I could go one step further (though I would not consider this part as a blocker for 0.8.2 but interested to what other folks think) about a consumer replica fetch size so that if the message max is increased messages will no longer be consumed (since the consumer fetch max would be max.message.bytes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199678#comment-14199678 ] schandr commented on KAFKA-1738: controller.socket.timeout.ms=9 controller.message.queue.size=20 auto.leader.rebalance.enable=true queued.max.requests=20 Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199702#comment-14199702 ] schandr commented on KAFKA-1738: One more observance in the server log Here is the Trace statement for the send request. This gets logged when channel.send(request) is invoked in the RequestSendThread. The send method invokes writeCompletely method in the Send class. 0:31:12,440] TRACE 131 bytes written. (kafka.network.BoundedByteBufferSend) [2014-11-05 10:31:12,440] TRACE 131 bytes written. (kafka.network.BoundedByteBufferSend) If the request gets resend the same statement should be relogged in the server log based on channel.send(request) code in the RequestSendThread which did not get logged. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
[jira] [Comment Edited] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199702#comment-14199702 ] schandr edited comment on KAFKA-1738 at 11/6/14 3:32 AM: - One more observance in the server log Here is the Trace statement for the send request. This gets logged when channel.send(request) is invoked in the RequestSendThread. The send method invokes writeCompletely method in the Send class. 0:31:12,440] TRACE 131 bytes written. (kafka.network.BoundedByteBufferSend) [2014-11-05 10:31:12,440] TRACE 131 bytes written. (kafka.network.BoundedByteBufferSend) If the request gets resend the same statement should be relogged in the server log based on channel.send(request) code in the RequestSendThread which did not get logged. And it could be a race condition between the LeaderAndIsr request resend and the UpdateMetaData request, which could lead to the channel.receive throwing a EOFException. Once the topic creation fails, no exceptions are thrown back for us to catch and retry. Is there any workaround for this issue? was (Author: schandr): One more observance in the server log Here is the Trace statement for the send request. This gets logged when channel.send(request) is invoked in the RequestSendThread. The send method invokes writeCompletely method in the Send class. 0:31:12,440] TRACE 131 bytes written. (kafka.network.BoundedByteBufferSend) [2014-11-05 10:31:12,440] TRACE 131 bytes written. (kafka.network.BoundedByteBufferSend) If the request gets resend the same statement should be relogged in the server log based on channel.send(request) code in the RequestSendThread which did not get logged. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
Re: No longer supporting Java 6, if? when?
Mostly converted now to 1.7, this would be welcomed to get any new features. On Wed Nov 05 2014 at 7:32:55 PM Joe Stein joe.st...@stealth.ly wrote: This has been coming up in a lot of projects and for other reasons too I wanted to kick off the discussion about if/when we end support for Java 6. Besides any API we may want to use in = 7 we also compile our binaries for 6 for release currently. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817 ] Vishal commented on KAFKA-1745: --- [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established from the producer system to the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 5:25 AM: [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time, why does it have to create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool
[jira] [Commented] (KAFKA-1754) KOYA - Kafka on YARN
[ https://issues.apache.org/jira/browse/KAFKA-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199918#comment-14199918 ] Arun C Murthy commented on KAFKA-1754: -- bq. My main concern is whether its possible to prevent YARN from spawning new container when a broker goes down. [~gwenshap] - YARN doesn't spawn a new container on its own... the Kafka-AM would get a notification that a particular broker (YARN container) has failed. The Kafka-AM can then decide when/where to request launch a new broker. KOYA - Kafka on YARN Key: KAFKA-1754 URL: https://issues.apache.org/jira/browse/KAFKA-1754 Project: Kafka Issue Type: New Feature Reporter: Thomas Weise Attachments: DT-KOYA-Proposal- JIRA.pdf YARN (Hadoop 2.x) has enabled clusters to be used for a variety of workloads, emerging as distributed operating system for big data applications. Initiatives are on the way to bring long running services under the YARN umbrella, leveraging it for centralized resource management and operations ([YARN-896] and examples such as HBase, Accumulo or Memcached through Slider). This JIRA is to propose KOYA (Kafka On Yarn), a YARN application master to launch and manage Kafka clusters running on YARN. Brokers will use resources allocated through YARN with support for recovery, monitoring etc. Please see attached for more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1754) KOYA - Kafka on YARN
[ https://issues.apache.org/jira/browse/KAFKA-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199920#comment-14199920 ] Arun C Murthy commented on KAFKA-1754: -- [~thw] - please feel free to reach out to either yarn-dev@ or me personally if you need any help. We'd love to see this supported well. Thanks! KOYA - Kafka on YARN Key: KAFKA-1754 URL: https://issues.apache.org/jira/browse/KAFKA-1754 Project: Kafka Issue Type: New Feature Reporter: Thomas Weise Attachments: DT-KOYA-Proposal- JIRA.pdf YARN (Hadoop 2.x) has enabled clusters to be used for a variety of workloads, emerging as distributed operating system for big data applications. Initiatives are on the way to bring long running services under the YARN umbrella, leveraging it for centralized resource management and operations ([YARN-896] and examples such as HBase, Accumulo or Memcached through Slider). This JIRA is to propose KOYA (Kafka On Yarn), a YARN application master to launch and manage Kafka clusters running on YARN. Brokers will use resources allocated through YARN with support for recovery, monitoring etc. Please see attached for more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332)