[jira] [Commented] (KAFKA-1004) Handle topic event for trivial whitelist topic filters
[ https://issues.apache.org/jira/browse/KAFKA-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13742351#comment-13742351 ] Jun Rao commented on KAFKA-1004: Thanks for the patch. That looks good. However, shouldn't we just remove requiresTopicEventWatcher from TopicFilter completely? Handle topic event for trivial whitelist topic filters -- Key: KAFKA-1004 URL: https://issues.apache.org/jira/browse/KAFKA-1004 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.7, 0.8.1 Attachments: KAFKA-1004.v1.patch Toay consumer's TopicEventWatcher is not subscribed with trivial whitelist topic names. Hence if the topic is not registered on ZK when the consumer is started, it will not trigger the rebalance of consumers later when it is created and hence not be consumed even if it is in the whilelist. A proposed fix would be always subscribe TopicEventWatcher for all whitelist consumers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1004) Handle topic event for trivial whitelist topic filters
[ https://issues.apache.org/jira/browse/KAFKA-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13742386#comment-13742386 ] Jun Rao commented on KAFKA-1004: Looks good to me. Joel, do you want to take another look? Handle topic event for trivial whitelist topic filters -- Key: KAFKA-1004 URL: https://issues.apache.org/jira/browse/KAFKA-1004 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.7, 0.8.1 Attachments: KAFKA-1004.v1.patch, KAFKA-1004.v2.patch Toay consumer's TopicEventWatcher is not subscribed with trivial whitelist topic names. Hence if the topic is not registered on ZK when the consumer is started, it will not trigger the rebalance of consumers later when it is created and hence not be consumed even if it is in the whilelist. A proposed fix would be always subscribe TopicEventWatcher for all whitelist consumers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception
[ https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13742837#comment-13742837 ] Jun Rao commented on KAFKA-992: --- Thanks for patch v14. Committed to 0.8. Double Check on Broker Registration to Avoid False NodeExist Exception -- Key: KAFKA-992 URL: https://issues.apache.org/jira/browse/KAFKA-992 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Attachments: KAFKA-992.v10.patch, KAFKA-992.v11.patch, KAFKA-992.v12.patch, KAFKA-992.v13.patch, KAFKA-992.v14.patch, KAFKA-992.v1.patch, KAFKA-992.v2.patch, KAFKA-992.v3.patch, KAFKA-992.v4.patch, KAFKA-992.v5.patch, KAFKA-992.v6.patch, KAFKA-992.v7.patch, KAFKA-992.v8.patch, KAFKA-992.v9.patch The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation. The side-effect of the above zookeeper behavior in Kafka, for certain corner cases, is that ephemeral nodes can be lost even if the session is not expired. The sequence of events that can lead to lossy ephemeral nodes is as follows - 1. The session expires on the client, it assumes the ephemeral nodes are deleted, so it establishes a new session with zookeeper and tries to re-create the ephemeral nodes. 2. However, when it tries to re-create the ephemeral node,zookeeper throws back a NodeExists error code. Now this is legitimate during a session disconnect event (since zkclient automatically retries the operation and raises a NodeExists error). Also by design, Kafka server doesn't have multiple zookeeper clients create the same ephemeral node, so Kafka server assumes the NodeExists is normal. 3. However, after a few seconds zookeeper deletes that ephemeral node. So from the client's perspective, even though the client has a new valid session, its ephemeral node is gone. This behavior is triggered due to very long fsync operations on the zookeeper leader. When the leader wakes up from such a long fsync operation, it has several sessions to expire. And the time between the session expiration and the ephemeral node deletion is magnified. Between these 2 operations, a zookeeper client can issue a ephemeral node creation operation, that could've appeared to have succeeded, but the leader later deletes the ephemeral node leading to permanent ephemeral node loss from the client's perspective. Thread from zookeeper mailing list: http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-649) Cleanup log4j logging
[ https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13743888#comment-13743888 ] Jun Rao commented on KAFKA-649: --- Thanks for patch v6. Committed to 0.8. Cleanup log4j logging - Key: KAFKA-649 URL: https://issues.apache.org/jira/browse/KAFKA-649 Project: Kafka Issue Type: Improvement Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Jun Rao Priority: Blocker Attachments: kafka-649_extra.patch, kafka-649.patch, KAFKA-649.v3.patch, KAFKA-649.v4.patch, KAFKA-649.v5.patch, KAFKA-649.v6.patch Review the logs and do the following: 1. Fix confusing or duplicative messages 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR) It would also be nice to add a log4j logger for the request logging (i.e. the access log) and another for the controller state change log, since these really have their own use cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability
[ https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13743930#comment-13743930 ] Jun Rao commented on KAFKA-990: --- Thanks for patch v2. A few more comments. 2.1 I think it's better to guard this in the command line. The issue is that if a user provided both options, it's not clear which one takes precedence. 2.2 In that case, we should make sure that brokerList is a mandatory field (like zkConnect). 30. KafkaController.initializeAndMaybeTriggerPartitionReassignment(): The following comment is weird. // need to call method 31. Related to Swapnil's comment in #11, currently, the tool finishes after the ZK path is created. It would be useful to add an option to check the state of partition reassignment so that we know either all assignments have completed or the set of partitions that are remaining. Fix ReassignPartitionCommand and improve usability -- Key: KAFKA-990 URL: https://issues.apache.org/jira/browse/KAFKA-990 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch, KAFKA-990-v2.patch 1. The tool does not register for IsrChangeListener on controller failover. 2. There is a race condition where the previous listener can fire on controller failover and the replicas can be in ISR. Even after re-registering the ISR listener after failover, it will never be triggered. 3. The input the tool is a static list which is very hard to use. To improve this, as a first step the tool needs to take a list of topics and list of brokers to do the assignment to and then generate the reassignment plan. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-1017) High number of open file handles in 0.8 producer
[ https://issues.apache.org/jira/browse/KAFKA-1017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1017. Resolution: Fixed Fix Version/s: 0.8 Good patch. +1. Committed to 0.8. High number of open file handles in 0.8 producer Key: KAFKA-1017 URL: https://issues.apache.org/jira/browse/KAFKA-1017 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Fix For: 0.8 Attachments: kafka-1017.patch Reported by Jun Rao: For over-partitioned topics, each broker could be the leader for at least 1 partition. In the producer, we randomly select a partition to send the data. Pretty soon, each producer will establish a connection to each of the n brokers. Effectively, we increased the # of socket connections by a factor of n, compared to 0.7. The increased number of socket connections increases the number of open file handles, this could come pretty close to the OS limit. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-955) After a leader change, messages sent with ack=0 are lost
[ https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747692#comment-13747692 ] Jun Rao commented on KAFKA-955: --- Magnus, thanks for your comment. What you suggested is interesting and could be a more effective way of communicating between the producer and the broker. It does require that the producer be able to receive requests initiated at the broker. We do plan to make the producer side processing selector based for efficiency reason. However, this will be a post 0.8 item. We could consider your suggestion then. Regarding your concern about dropped messages, my take is the following. If a client chooses not to receive an ack, it probably means that losing a few batch of messages is not that important. If a client does care about data loss, it can choose ack with 1 or -1. The throughout will be less. However, there are other ways to improve the throughput (e.g., using a larger batch size and/or more instances of producers). Guozhang, patch v3 looks good to me overall. A few more comments: 30. SyncProducerTest.testMessagesizeTooLargeWithAckZero(): You hardcoded the sleep to 500ms. Could you change it to the waitUntil style wait such that the test can finish early if the conditions have been met? 31. KafkaApi.handleProducerRequest(): The logging should probably be at debug level since this doesn't indicate an error at the broker. It's really an error for the client. After a leader change, messages sent with ack=0 are lost Key: KAFKA-955 URL: https://issues.apache.org/jira/browse/KAFKA-955 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Guozhang Wang Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, KAFKA-955.v2.patch, KAFKA-955.v3.patch If the leader changes for a partition, and a producer is sending messages with ack=0, then messages will be lost, since the producer has no active way of knowing that the leader has changed, until it's next metadata refresh update. The broker receiving the message, which is no longer the leader, logs a message like this: Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741 This is exacerbated by the controlled shutdown mechanism, which forces an immediate leader change. A possible solution to this would be for a broker which receives a message, for a topic that it is no longer the leader for (and if the ack level is 0), then the broker could just silently forward the message over to the current leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-975) Leader not local for partition when partition is leader (kafka.common.NotLeaderForPartitionException)
[ https://issues.apache.org/jira/browse/KAFKA-975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13748630#comment-13748630 ] Jun Rao commented on KAFKA-975: --- Could you describe how to reproduce the issue? Leader not local for partition when partition is leader (kafka.common.NotLeaderForPartitionException) - Key: KAFKA-975 URL: https://issues.apache.org/jira/browse/KAFKA-975 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8 Environment: centos 6.4 Reporter: Dan Swanson Assignee: Neha Narkhede I have a two server kafka cluster (dev003 and dev004). I am following the example from this URL but using two servers with a single kafka instance instead of using 1 server with two instances.. http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/ Using the following trunk version commit c27c768463a5dc6be113f2e5b3e00bf8d9d9d602 Author: David Arthur mum...@gmail.com Date: Thu Jul 11 15:34:57 2013 -0700 KAFKA-852, remove clientId from Offset{Fetch,Commit}Response. Reviewed by Jay. -- [2013-07-16 10:56:50,279] INFO [Kafka Server 3], started (kafka.server.KafkaServer) -- dan@linux-rr29:~/git-data/kafka-current-src bin/kafka-topics.sh --zookeeper dev003:2181 --create --topic dadj1 --partitions 1 --replication-factor 2 2/dev/null Created topic dadj1. dan@linux-rr29:~/git-data/kafka-current-src --- [2013-07-16 10:56:57,946] INFO [Replica Manager on Broker 3]: Handling LeaderAndIsr request Name:LeaderAndIsrRequest;Version:0;Controller:4;ControllerEpoch:19;CorrelationId:12;ClientId:id_4-host_dev004-port_9092;PartitionState:(dadj1,0) - (LeaderAndIsrInfo:(Leader:3,ISR:3,4,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:3,4);Leaders:id:3,host:dev003,port:9092 (kafka.server.ReplicaManager) [2013-07-16 10:56:57,959] INFO [ReplicaFetcherManager on broker 3] Removing fetcher for partition [dadj1,0] (kafka.server.ReplicaFetcherManager) [2013-07-16 10:57:21,196] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [dadj1,0] failed due to Leader not local for partition [dadj1,0] on broker 3 (kafka.server.KafkaApis) - dan@linux-rr29:~/git-data/kafka-current-src bin/kafka-topics.sh --zookeeper dev003:2181 --describe --topic dadj1 2/dev/null dadj1 configs: partitions: 1 topic: dadj1partition: 0leader: 3 replicas: 3,4 isr: 3,4 dan@linux-rr29:~/git-data/kafka-current-src Dev003 logs show that server is elected as leader and has correct id of 3, zookeeper shows dev003 is leader, but when I try to produce to the topic I get a failure because the server thinks it is not the leader. This occurs regardless of which server (dev003 or dev004) ends up the leader. Here is my config which is the same except for the broker id and host names [root@dev003 kafka-current-src]# grep -v -e '^#' -e '^$' config/server.properties broker.id=3 port=9092 host.name=dev003 num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dir=/opt/kafka/data/8.0/ num.partitions=1 log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.cleanup.interval.mins=1 zookeeper.connect=10.200.8.61:2181,10.200.8.62:2181,10.200.8.63:2181 zookeeper.connection.timeout.ms=100 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/kafka_metrics kafka.csv.metrics.reporter.enabled=false [root@dev003 kafka-current-src]# -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1012) Implement an Offset Manager and hook offset requests to it
[ https://issues.apache.org/jira/browse/KAFKA-1012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13750580#comment-13750580 ] Jun Rao commented on KAFKA-1012: Thanks for the patch. This looks promising. Some comments: 20. ZookeeperConsumerConnector: 20.1 Currently, this class manages an OffsetFetchChannel and OffsetCommitProducer directly. Could we use an OffsetClientManager to manage them together? 20.2 commitOffsets() Instead of doing topicRegistry.flatMap(t = ) you can do topicRegistry.flatMap{case(topic, streamToPartitionMap) = } and reference the named fields directly. 20.3 The patch added a couple of locks to synchronize the offset commits. In Kafka-989, we changed shutdown() to synchronize on the rebalance lock. I am wondering if we can just synchronize on the same lock for offsets related stuff. 20.4 addPartitionTopicInfo: If getFetchOffset returns an OffsetLoadingCode error, should we keep retrying until we get a valid offset? If so, we could write the logic in a util function. Also, correlationId should change. 21. KafkaApis: 21.1 offsetFetchChannelPool: The controller also manages a socket connection channel pool. Could we reuse the same code or have a shared broker channel pool btw the controller and KafkaApis? 21.2 handleProducerRequest(): Since producer requests to OffsetTopic always use ack = -1, do we need to handle it in the case when ack is not -1? 21.3 handleTopicMetadataRequest(): during topic creation, it seems that we can put the following line in a final clause of try. topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode) 21.4 handleOffsetFetchRequest(): I am not sure if this should trigger the creation of OffsetTopic. It seems to me that OffsetTopic should only be created at CommitOffset time. If the OffsetTopic doesn't exist, we should probably just return an offset of -1 to the caller. 22. OffsetManager: 22.1 OffsetAndMetadata: The timestamp in the offset value is not being used. 22.2 triggerLoadOffsets: Instead of doing currOffset = currOffset + 1 we probably should do currOffset = m.offset + 1 This is because offsets are not guaranteed to be consecutive when dedupe is enabled. 22.3 Should we do the common statements like the following in a finally clause? loading.remove(offsetsPartition) // unlock to prevent from blocking offset fetch requests 23. ErrorMapping: Do we need a corresponding exception for OffsetsLoadingCode? A couple of high level questions: 24. How do we clean up the offsets for those consumers that are gone? For example, there could be many instances of console consumers that come and go. We probably don't want to keep their offsets in either the in-memory table or the offset log forever. 25. How do we plan to migrate existing consumers to this new offset storage? Do we need to stop all consumer instances and do an offset import first or can we do this online? Implement an Offset Manager and hook offset requests to it -- Key: KAFKA-1012 URL: https://issues.apache.org/jira/browse/KAFKA-1012 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Tejas Patil Assignee: Tejas Patil Priority: Minor Attachments: KAFKA-1012.patch, KAFKA-1012-v2.patch After KAFKA-657, we have a protocol for consumers to commit and fetch offsets from brokers. Currently, consumers are not using this API and directly talking with Zookeeper. This Jira will involve following: 1. Add a special topic in kafka for storing offsets 2. Add an OffsetManager interface which would handle storing, accessing, loading and maintaining consumer offsets 3. Implement offset managers for both of these 2 choices : existing ZK based storage or inbuilt storage for offsets. 4. Leader brokers would now maintain an additional hash table of offsets for the group-topic-partitions that they lead 5. Consumers should now use the OffsetCommit and OffsetFetch API -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-677) Retention process gives exception if an empty segment is chosen for collection
[ https://issues.apache.org/jira/browse/KAFKA-677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-677. --- Resolution: Cannot Reproduce No longer see this. Close it for now. Retention process gives exception if an empty segment is chosen for collection -- Key: KAFKA-677 URL: https://issues.apache.org/jira/browse/KAFKA-677 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.8.1 Attachments: kafka_677-cleanup.patch java.io.FileNotFoundException: /mnt/u001/kafka_08_long_running_test/kafka-logs/NewsActivityEvent-3/.index (No such file or directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:212) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:244) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:233) at kafka.log.Log.rollToOffset(Log.scala:459) at kafka.log.Log.roll(Log.scala:443) at kafka.log.Log.markDeletedWhile(Log.scala:395) at kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:241) at kafka.log.LogManager$$anonfun$cleanupLogs$2.apply(LogManager.scala:277) at kafka.log.LogManager$$anonfun$cleanupLogs$2.apply(LogManager.scala:275) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495) at kafka.log.LogManager.cleanupLogs(LogManager.scala:275) at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:141) at kafka.utils.Utils$$anon$2.run(Utils.scala:66) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-677) Retention process gives exception if an empty segment is chosen for collection
[ https://issues.apache.org/jira/browse/KAFKA-677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-677. - Retention process gives exception if an empty segment is chosen for collection -- Key: KAFKA-677 URL: https://issues.apache.org/jira/browse/KAFKA-677 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.8.1 Attachments: kafka_677-cleanup.patch java.io.FileNotFoundException: /mnt/u001/kafka_08_long_running_test/kafka-logs/NewsActivityEvent-3/.index (No such file or directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:212) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:244) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:233) at kafka.log.Log.rollToOffset(Log.scala:459) at kafka.log.Log.roll(Log.scala:443) at kafka.log.Log.markDeletedWhile(Log.scala:395) at kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:241) at kafka.log.LogManager$$anonfun$cleanupLogs$2.apply(LogManager.scala:277) at kafka.log.LogManager$$anonfun$cleanupLogs$2.apply(LogManager.scala:275) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495) at kafka.log.LogManager.cleanupLogs(LogManager.scala:275) at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:141) at kafka.utils.Utils$$anon$2.run(Utils.scala:66) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Assigned] (KAFKA-1029) Zookeeper leader election stuck in ephemeral node retry loop
[ https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reassigned KAFKA-1029: -- Assignee: Sam Meder (was: Neha Narkhede) Zookeeper leader election stuck in ephemeral node retry loop Key: KAFKA-1029 URL: https://issues.apache.org/jira/browse/KAFKA-1029 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8 Reporter: Sam Meder Assignee: Sam Meder Priority: Blocker Fix For: 0.8 Attachments: 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch We're seeing the following log statements (over and over): [2013-08-27 07:21:49,538] INFO conflict in /controller data: { brokerid:3, timestamp:1377587945206, version:1 } stored data: { brokerid:2, timestamp:1377587460904, version:1 } (kafka.utils.ZkUtils$) [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ brokerid:3, timestamp:1377587945206, version:1 }] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) where the broker is essentially stuck in the loop that is trying to deal with left-over ephemeral nodes. The code looks a bit racy to me. In particular: ZookeeperLeaderElector: def elect: Boolean = { controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) val timestamp = SystemTime.milliseconds.toString val electString = ... try { createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId, (controllerString : String, leaderId : Any) = KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], controllerContext.zkSessionTimeout) leaderChangeListener is registered before the create call (by the way, it looks like a new registration will be added every elect call - shouldn't it register in startup()?) so can update leaderId to the current leader before the call to create. If that happens then we will continuously get node exists exceptions and the checker function will always return true, i.e. we will never get out of the while(true) loop. I think the right fix here is to pass brokerId instead of leaderId when calling create, i.e. createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, (controllerString : String, leaderId : Any) = KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], controllerContext.zkSessionTimeout) The loop dealing with the ephemeral node bug is now only triggered for the broker that owned the node previously, although I am still not 100% sure if that is sufficient. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Assigned] (KAFKA-995) Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes
[ https://issues.apache.org/jira/browse/KAFKA-995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reassigned KAFKA-995: - Assignee: Sam Meder Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes --- Key: KAFKA-995 URL: https://issues.apache.org/jira/browse/KAFKA-995 Project: Kafka Issue Type: Improvement Components: config Affects Versions: 0.8 Reporter: Sam Meder Assignee: Sam Meder Priority: Minor Fix For: 0.8, 0.8.1 Attachments: replica_fetch_size_config.patch replica.fetch.max.bytes must always be = message.max.bytes for replication to work correctly. This ticket adds enforcement of the constraint. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-1031) Little modification to the stop script to be able to kill the proper process
[ https://issues.apache.org/jira/browse/KAFKA-1031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1031. Resolution: Fixed Fix Version/s: 0.8 Assignee: Vladislav Pernin Thanks for the patch. Committed to 0.8. Little modification to the stop script to be able to kill the proper process Key: KAFKA-1031 URL: https://issues.apache.org/jira/browse/KAFKA-1031 Project: Kafka Issue Type: Bug Reporter: Vladislav Pernin Assignee: Vladislav Pernin Fix For: 0.8 Attachments: 0001-Escape-the-.-in-the-kafka.Kafka-chain.patch Escape the . in the kafka.Kafka chain Also add a grep java to get the real java process and exclude the kafka-run-class.sh process -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-1038) fetch response should use empty messageset instead of null when handling errors
Jun Rao created KAFKA-1038: -- Summary: fetch response should use empty messageset instead of null when handling errors Key: KAFKA-1038 URL: https://issues.apache.org/jira/browse/KAFKA-1038 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Saw the following exception: Exception when handling request (kafka.server.KafkaRequestHandler) java.lang.NullPointerException at kafka.api.FetchResponsePartitionData.init(FetchResponse.scala:46) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.HashMap.map(HashMap.scala:38) at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) at kafka.server.KafkaApis.handle(KafkaApis.scala:78) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1038) fetch response should use empty messageset instead of null when handling errors
[ https://issues.apache.org/jira/browse/KAFKA-1038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1038: --- Status: Patch Available (was: Open) fetch response should use empty messageset instead of null when handling errors --- Key: KAFKA-1038 URL: https://issues.apache.org/jira/browse/KAFKA-1038 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Attachments: kafka-1038.patch Saw the following exception: Exception when handling request (kafka.server.KafkaRequestHandler) java.lang.NullPointerException at kafka.api.FetchResponsePartitionData.init(FetchResponse.scala:46) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.HashMap.map(HashMap.scala:38) at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) at kafka.server.KafkaApis.handle(KafkaApis.scala:78) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1038) fetch response should use empty messageset instead of null when handling errors
[ https://issues.apache.org/jira/browse/KAFKA-1038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1038: --- Attachment: kafka-1038.patch Attach a patch. fetch response should use empty messageset instead of null when handling errors --- Key: KAFKA-1038 URL: https://issues.apache.org/jira/browse/KAFKA-1038 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Attachments: kafka-1038.patch Saw the following exception: Exception when handling request (kafka.server.KafkaRequestHandler) java.lang.NullPointerException at kafka.api.FetchResponsePartitionData.init(FetchResponse.scala:46) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.HashMap.map(HashMap.scala:38) at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) at kafka.server.KafkaApis.handle(KafkaApis.scala:78) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1038) fetch response should use empty messageset instead of null when handling errors
[ https://issues.apache.org/jira/browse/KAFKA-1038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13756685#comment-13756685 ] Jun Rao commented on KAFKA-1038: This patch only fixes the response for the fetch request. So it's unrelated to ack=0, since it is only used in the produce request. You did bring up a good point on ack=0. Will comment on kafka-955. fetch response should use empty messageset instead of null when handling errors --- Key: KAFKA-1038 URL: https://issues.apache.org/jira/browse/KAFKA-1038 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Attachments: kafka-1038.patch Saw the following exception: Exception when handling request (kafka.server.KafkaRequestHandler) java.lang.NullPointerException at kafka.api.FetchResponsePartitionData.init(FetchResponse.scala:46) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.HashMap.map(HashMap.scala:38) at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) at kafka.server.KafkaApis.handle(KafkaApis.scala:78) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-955) After a leader change, messages sent with ack=0 are lost
[ https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13756692#comment-13756692 ] Jun Rao commented on KAFKA-955: --- Thanks for patch v7. A couple of more comments. 70. There is a long standing bug in ProducerRequest.handleError(). If ack=0, we shouldn't send a response when the broker hits an unexpected error. We should either close the socket connection or send no response. Not sure which one is better. 71. A minor issue. The following comment in RequestChannel is a bit confusing. It sounds like that it needs to read more data from network to complete this request, but it is not. /** No operation to take for the request, need to read more over the network */ def noOperation(processor: Int, request: RequestChannel.Request) { After a leader change, messages sent with ack=0 are lost Key: KAFKA-955 URL: https://issues.apache.org/jira/browse/KAFKA-955 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jason Rosenberg Assignee: Guozhang Wang Fix For: 0.8 Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, KAFKA-955.v2.patch, KAFKA-955.v3.patch, KAFKA-955.v4.patch, KAFKA-955.v5.patch, KAFKA-955.v6.patch, KAFKA-955.v7.patch If the leader changes for a partition, and a producer is sending messages with ack=0, then messages will be lost, since the producer has no active way of knowing that the leader has changed, until it's next metadata refresh update. The broker receiving the message, which is no longer the leader, logs a message like this: Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741 This is exacerbated by the controlled shutdown mechanism, which forces an immediate leader change. A possible solution to this would be for a broker which receives a message, for a topic that it is no longer the leader for (and if the ack level is 0), then the broker could just silently forward the message over to the current leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline
[ https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13756703#comment-13756703 ] Jun Rao commented on KAFKA-1011: Thanks for the patch. Not sure if it works correctly though. The issue is that in MirrorMaker, if we pass in the compressed bytes to Producer, we need to let the producer mark the message as compressed (in the attribute in the message header). We can only do that by enabling compression in the producer. However, we can't do that since that will compress the compressed bytes again. So, we will have to either change the Producer api to give us enough hook to package the message correctly. Alternatively, we could send data using SyncProducer by packaging the message in exactly the way that we want. However, some of the logic in Producer will have to be duplicated. Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline -- Key: KAFKA-1011 URL: https://issues.apache.org/jira/browse/KAFKA-1011 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.1 Attachments: KAFKA-1011.v1.patch The way MirrorMaker works today is that its consumers could use deep iterator to decompress messages received from the source brokers and its producers could re-compress the messages while sending them to the target brokers. Since MirrorMakers use a centralized data channel for its consumers to pipe messages to its producers, and since producers would compress messages with the same topic within a batch as a single produce request, this could result in messages accepted at the front end of the pipeline being dropped at the target brokers of the MirrorMaker due to MesageSizeTooLargeException if it happens that one batch of messages contain too many messages of the same topic in MirrorMaker's producer. If we can use shallow iterator at the MirrorMaker's consumer side to directly pipe compressed messages this issue can be fixed. Also as Swapnil pointed out, currently if the MirrorMaker lags and there are large messages in the MirrorMaker queue (large after decompression), it can run into an OutOfMemoryException. Shallow iteration will be very helpful in avoiding this exception. The proposed solution of this issue is also related to KAFKA-527. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-937. - Thanks for the review. Committed the delta patch to 0.8, after fixing the logging. ConsumerFetcherThread can deadlock -- Key: KAFKA-937 URL: https://issues.apache.org/jira/browse/KAFKA-937 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka-937_ConsumerOffsetChecker.patch, kafka-937_delta.patch, kafka-937.patch We have the following access pattern that can introduce a deadlock. AbstractFetcherThread.processPartitionsWithError() - ConsumerFetcherThread.processPartitionsWithError() - ConsumerFetcherManager.addPartitionsWithError() wait for lock - LeaderFinderThread holding lock while calling AbstractFetcherManager.shutdownIdleFetcherThreads() - AbstractFetcherManager calling fetcher.shutdown, which needs to wait until AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1039) kafka return acks but not logging while sending messages with compression
[ https://issues.apache.org/jira/browse/KAFKA-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13757444#comment-13757444 ] Jun Rao commented on KAFKA-1039: Could you describe how to reproduce the issue? kafka return acks but not logging while sending messages with compression - Key: KAFKA-1039 URL: https://issues.apache.org/jira/browse/KAFKA-1039 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Environment: ubuntu 64bit Reporter: Xiang chao Assignee: Jay Kreps when send message with compression, the broker return acks, but don't write messages to disk. So I can't get messages using consumers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1039) kafka return acks but not logging while sending messages with compression
[ https://issues.apache.org/jira/browse/KAFKA-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13757803#comment-13757803 ] Jun Rao commented on KAFKA-1039: It this the same problem as kafka-1037 when snappy jar is not included in the classpath? kafka return acks but not logging while sending messages with compression - Key: KAFKA-1039 URL: https://issues.apache.org/jira/browse/KAFKA-1039 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Environment: ubuntu 64bit Reporter: Xiang chao Assignee: Jay Kreps when send message with compression, the broker return acks, but don't write messages to disk. So I can't get messages using consumers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-1039) kafka return acks but not logging while sending messages with compression
[ https://issues.apache.org/jira/browse/KAFKA-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-1039. -- kafka return acks but not logging while sending messages with compression - Key: KAFKA-1039 URL: https://issues.apache.org/jira/browse/KAFKA-1039 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Environment: ubuntu 64bit Reporter: Xiang chao Assignee: Jay Kreps when send message with compression, the broker return acks, but don't write messages to disk. So I can't get messages using consumers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-1039) kafka return acks but not logging while sending messages with compression
[ https://issues.apache.org/jira/browse/KAFKA-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1039. Resolution: Duplicate Close this since it's a duplicate of kafka-1037. kafka return acks but not logging while sending messages with compression - Key: KAFKA-1039 URL: https://issues.apache.org/jira/browse/KAFKA-1039 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Environment: ubuntu 64bit Reporter: Xiang chao Assignee: Jay Kreps when send message with compression, the broker return acks, but don't write messages to disk. So I can't get messages using consumers. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1035) Add message-send-max-retries and retry-backoff-ms options to console producer
[ https://issues.apache.org/jira/browse/KAFKA-1035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1035: --- Resolution: Fixed Assignee: Rajasekar Elango (was: Jun Rao) Status: Resolved (was: Patch Available) Thanks for the patch. +1. Committed to 0.8. Add message-send-max-retries and retry-backoff-ms options to console producer - Key: KAFKA-1035 URL: https://issues.apache.org/jira/browse/KAFKA-1035 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.7, 0.8 Reporter: Rajasekar Elango Assignee: Rajasekar Elango Labels: patch Fix For: 0.8 Attachments: console_producer_add_options.patch, console_producer_add_options-v2.patch It's possible for console producer to give up too soon if it can't find a leader of a topic. Increasing message-send-max-retries would resolve this but. Console producer doesn't provide options to set message-send-max-retries and retry-backoff-ms. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-1035) Add message-send-max-retries and retry-backoff-ms options to console producer
[ https://issues.apache.org/jira/browse/KAFKA-1035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-1035. -- Add message-send-max-retries and retry-backoff-ms options to console producer - Key: KAFKA-1035 URL: https://issues.apache.org/jira/browse/KAFKA-1035 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.7, 0.8 Reporter: Rajasekar Elango Assignee: Rajasekar Elango Labels: patch Fix For: 0.8 Attachments: console_producer_add_options.patch, console_producer_add_options-v2.patch It's possible for console producer to give up too soon if it can't find a leader of a topic. Increasing message-send-max-retries would resolve this but. Console producer doesn't provide options to set message-send-max-retries and retry-backoff-ms. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-953) Remove release-zip from README we are not releasing with it
[ https://issues.apache.org/jira/browse/KAFKA-953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-953. --- Resolution: Fixed We actually do support the release-zip target, as well as release-tar. Remove release-zip from README we are not releasing with it --- Key: KAFKA-953 URL: https://issues.apache.org/jira/browse/KAFKA-953 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joe Stein Priority: Blocker Labels: 0.8.0-beta1 Fix For: 0.8 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1042) Fix segment flush logic
[ https://issues.apache.org/jira/browse/KAFKA-1042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1042: --- Affects Version/s: (was: 0.8) 0.8.1 Fix Version/s: (was: 0.8) This is actually for trunk. Jay, Do you want to take a look? Fix segment flush logic --- Key: KAFKA-1042 URL: https://issues.apache.org/jira/browse/KAFKA-1042 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1 Environment: centos6.4 Reporter: Joris Van Remoortere Assignee: Jay Kreps Priority: Critical Labels: patch Attachments: flush_segment_fix-v1.patch The logic for deciding which segments to flush in log.flush() was missing the case where the (from, to) range was within a single segment. This case enables some of the features in the server.properties file such as message volume / time based triggers for fsync on the log files. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-960: -- Affects Version/s: (was: 0.8) 0.8.1 Fix Version/s: (was: 0.8) This is likely a post 0.8 item. Moving to 0.8.1. Upgrade Metrics to 3.x -- Key: KAFKA-960 URL: https://issues.apache.org/jira/browse/KAFKA-960 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Cosmin Lehene Now that metrics 3.0 has been released (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-984) Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same
[ https://issues.apache.org/jira/browse/KAFKA-984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-984: -- Fix Version/s: (was: 0.8) 0.8.1 Since the changes are relative large and subtle, we should probably revisit this post 0.8. Moving it to 0.8.1. Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same - Key: KAFKA-984 URL: https://issues.apache.org/jira/browse/KAFKA-984 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.1 Attachments: KAFKA-984.v1.patch, KAFKA-984.v2.patch, KAFKA-984.v2.patch Currently a full rebalance will be triggered on high level consumers even when just a new topic is added to ZK. Better avoid this behavior but only rebalance on this newly added topic. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-995) Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes
[ https://issues.apache.org/jira/browse/KAFKA-995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-995: -- Resolution: Fixed Fix Version/s: (was: 0.8) Status: Resolved (was: Patch Available) Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes --- Key: KAFKA-995 URL: https://issues.apache.org/jira/browse/KAFKA-995 Project: Kafka Issue Type: Improvement Components: config Affects Versions: 0.8 Reporter: Sam Meder Assignee: Sam Meder Priority: Minor Fix For: 0.8.1 Attachments: replica_fetch_size_config.patch replica.fetch.max.bytes must always be = message.max.bytes for replication to work correctly. This ticket adds enforcement of the constraint. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-997) Provide a strict verification mode when reading configuration properties
[ https://issues.apache.org/jira/browse/KAFKA-997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-997: -- Affects Version/s: (was: 0.8) Fix Version/s: (was: 0.8) 0.8.1 Since this has a low priority, I am moving this to 0.8.1. Could you provide a patch to trunk instead Provide a strict verification mode when reading configuration properties Key: KAFKA-997 URL: https://issues.apache.org/jira/browse/KAFKA-997 Project: Kafka Issue Type: Improvement Components: config Reporter: Sam Meder Assignee: Sam Meder Priority: Minor Fix For: 0.8.1 Attachments: strict-verification-2.patch This ticket is based on the discussion in KAFKA-943. It introduces a new property that makes the config system throw an exception when it encounters unrecognized properties. (instead of a simple warn-level log statement). This new property defaults to false. Hopefully this will result in fewer instance of out-of-date configuration. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-1052) integrate add-partitions command into topicCommand
Jun Rao created KAFKA-1052: -- Summary: integrate add-partitions command into topicCommand Key: KAFKA-1052 URL: https://issues.apache.org/jira/browse/KAFKA-1052 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1 Reporter: Jun Rao Assignee: Sriram Subramanian After merging from 0.8 (kafka-1051), we dragged in a new admin command add-partitions. This needs to be integrated with the general topicCommand. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1051) merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk
[ https://issues.apache.org/jira/browse/KAFKA-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1051: --- Attachment: kafka-1051.patch Attach a patch for review. Somehow, couldn't publish a reviewboard because of the following error: Error data: {u'stat': u'fail', u'file': u'bin/kafka-console-consumer-log4j.properties', u'err': {u'msg': u'The file was not found in the repository', u'code': 207}, u'revision': u''} merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk Key: KAFKA-1051 URL: https://issues.apache.org/jira/browse/KAFKA-1051 Project: Kafka Issue Type: Task Affects Versions: 0.8.1 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.1 Attachments: kafka-1051.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-970) ./sbt +package rebuilds the Hadoop consumer jar N times with the same output file
[ https://issues.apache.org/jira/browse/KAFKA-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13764540#comment-13764540 ] Jun Rao commented on KAFKA-970: --- This affects the following jars. ./contrib/hadoop-consumer/target/hadoop-consumer-0.8.0-beta1.jar ./contrib/hadoop-producer/target/hadoop-producer-0.8.0-beta1.jar ./examples/target/kafka-java-examples-0.8.0-beta1.jar ./sbt +package rebuilds the Hadoop consumer jar N times with the same output file - Key: KAFKA-970 URL: https://issues.apache.org/jira/browse/KAFKA-970 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Fix For: 0.8 Running ./sbt +package now builds all jars for all scala versions. Unfortunately for the Hadoop producer and consumer since it uses the same file name every time this just means it is overwriting the same file over and over and the final file is whatever the last scala version is that is built. This should be a trivial fix. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1051) merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk
[ https://issues.apache.org/jira/browse/KAFKA-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13764503#comment-13764503 ] Jun Rao commented on KAFKA-1051: Will leave this jira un-closed for a few days in case people notice any issue with the merge. merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk Key: KAFKA-1051 URL: https://issues.apache.org/jira/browse/KAFKA-1051 Project: Kafka Issue Type: Task Affects Versions: 0.8.1 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.1 Attachments: kafka-1051.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1051) merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk
[ https://issues.apache.org/jira/browse/KAFKA-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1051: --- Resolution: Fixed Status: Resolved (was: Patch Available) Since all merge conflicts are easy to resolve, committed the patch to trunk. All unit tests pass in trunk. Filed KAFKA-1052 to integrate the addPartitions command into topicCommand. The following is a summary of the changes. # On branch trunk # Changes to be committed: # # modified: README.md # modified: bin/kafka-check-reassignment-status.sh # deleted:bin/kafka-console-consumer-log4j.properties # modified: bin/kafka-console-consumer.sh # modified: bin/kafka-console-producer.sh # modified: bin/kafka-consumer-perf-test.sh # modified: bin/kafka-preferred-replica-election.sh # modified: bin/kafka-producer-perf-test.sh # modified: bin/kafka-reassign-partitions.sh # modified: bin/kafka-replay-log-producer.sh # modified: bin/kafka-run-class.sh # modified: bin/kafka-server-start.sh # modified: bin/kafka-server-stop.sh # modified: bin/kafka-simple-consumer-perf-test.sh # modified: bin/zookeeper-server-start.sh # modified: config/log4j.properties # modified: config/producer.properties # renamed:bin/kafka-add-partitions.sh - config/tools-log4j.properties # modified: contrib/hadoop-producer/README.md # modified: contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java # modified: contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java # new file: core/src/main/scala/kafka/admin/AddPartitionsCommand.scala # modified: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala # modified: core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala # modified: core/src/main/scala/kafka/cluster/Broker.scala # modified: core/src/main/scala/kafka/cluster/Partition.scala # modified: core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala # modified: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala # modified: core/src/main/scala/kafka/consumer/SimpleConsumer.scala # modified: core/src/main/scala/kafka/controller/ControllerChannelManager.scala # modified: core/src/main/scala/kafka/controller/KafkaController.scala # modified: core/src/main/scala/kafka/controller/PartitionStateMachine.scala # modified: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala # modified: core/src/main/scala/kafka/network/RequestChannel.scala # modified: core/src/main/scala/kafka/network/SocketServer.scala # new file: core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala # modified: core/src/main/scala/kafka/producer/ConsoleProducer.scala # modified: core/src/main/scala/kafka/producer/Producer.scala # modified: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala # modified: core/src/main/scala/kafka/server/AbstractFetcherThread.scala # modified: core/src/main/scala/kafka/server/KafkaApis.scala # modified: core/src/main/scala/kafka/server/KafkaConfig.scala # modified: core/src/main/scala/kafka/server/KafkaRequestHandler.scala # modified: core/src/main/scala/kafka/server/ReplicaManager.scala # modified: core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala # modified: core/src/main/scala/kafka/tools/ImportZkOffsets.scala # modified: core/src/main/scala/kafka/tools/MirrorMaker.scala # new file: core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala # modified: core/src/test/scala/unit/kafka/producer/ProducerTest.scala # modified: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala # modified: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala # modified: core/src/test/scala/unit/kafka/utils/TestUtils.scala # modified: project/Build.scala # modified: system_test/migration_tool_testsuite/0.7/lib/kafka-perf-0.7.0.jar # modified: system_test/migration_tool_testsuite/config/migration_producer.properties # modified: system_test/migration_tool_testsuite/migration_tool_test.py # modified: system_test/migration_tool_testsuite/testcase_9001/testcase_9001_properties.json # modified: system_test/migration_tool_testsuite/testcase_9003/testcase_9003_properties.json # modified: system_test/migration_tool_testsuite/testcase_9004/testcase_9004_properties.json # modified: system_test/migration_tool_testsuite/testcase_9005/testcase_9005_properties.json # modified: system_test/migration_tool_testsuite/testcase_9006/testcase_9006_properties.json # modified: system_test/mirror_maker_testsuite/mirror_maker_test.py # modified: system_test/replication_testsuite/replica_basic_test.py # modified: system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json # modified: system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json # modified:
[jira] [Commented] (KAFKA-1046) Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x
[ https://issues.apache.org/jira/browse/KAFKA-1046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13764544#comment-13764544 ] Jun Rao commented on KAFKA-1046: Chris, I merged everything in 0.8 before your change to trunk. Could you rebase your patch for trunk? Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x -- Key: KAFKA-1046 URL: https://issues.apache.org/jira/browse/KAFKA-1046 Project: Kafka Issue Type: Improvement Affects Versions: 0.8 Reporter: Christopher Freeman Assignee: Christopher Freeman Attachments: kafka_2_10_refactor_0.8.patch, kafka_2_10_refactor.patch, Screen Shot 2013-09-09 at 9.34.09 AM.png I refactored the project such that it will compile against Scala 2.10.1. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-1051) merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk
Jun Rao created KAFKA-1051: -- Summary: merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk Key: KAFKA-1051 URL: https://issues.apache.org/jira/browse/KAFKA-1051 Project: Kafka Issue Type: Task Affects Versions: 0.8.1 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.1 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1051) merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk
[ https://issues.apache.org/jira/browse/KAFKA-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1051: --- Status: Patch Available (was: Open) merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk Key: KAFKA-1051 URL: https://issues.apache.org/jira/browse/KAFKA-1051 Project: Kafka Issue Type: Task Affects Versions: 0.8.1 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.1 Attachments: kafka-1051.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1051) merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk
[ https://issues.apache.org/jira/browse/KAFKA-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13765141#comment-13765141 ] Jun Rao commented on KAFKA-1051: There's a problem in generating a reviewboard with this patch, as I posted earlier. Now that the patch is checked in, it shouldn't be too hard to review the changes from intellij. Could you review the changes first and see it's too much of a hassle to put in review comments? merge from 0.8 da4512174b6f7c395ffe053a86d2c6bb19d2538a to trunk Key: KAFKA-1051 URL: https://issues.apache.org/jira/browse/KAFKA-1051 Project: Kafka Issue Type: Task Affects Versions: 0.8.1 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.1 Attachments: kafka-1051.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-925) Add optional partition key override in producer
[ https://issues.apache.org/jira/browse/KAFKA-925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13765591#comment-13765591 ] Jun Rao commented on KAFKA-925: --- Thanks for the post commit patch. Looks good. +1. Add optional partition key override in producer --- Key: KAFKA-925 URL: https://issues.apache.org/jira/browse/KAFKA-925 Project: Kafka Issue Type: New Feature Components: producer Affects Versions: 0.8.1 Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.8.1 Attachments: KAFKA-925-post-commit-v1.patch, KAFKA-925-v1.patch, KAFKA-925-v2.patch We have a key that is used for partitioning in the producer and stored with the message. Actually these uses, though often the same, could be different. The two meanings are effectively: 1. Assignment to a partition 2. Deduplication within a partition In cases where we want to allow the client to take advantage of both of these and they aren't the same it would be nice to allow them to be specified separately. To implement this I added an optional partition key to KeyedMessage. When specified this key is used for partitioning rather than the message key. This key is of type Any and the parametric typing is removed from the partitioner to allow it to work with either key. An alternative would be to allow the partition id to specified in the KeyedMessage. This would be slightly more convenient in the case where there is no partition key but instead you know a priori the partition number--this case must be handled by giving the partition id as the partition key and using an identity partitioner which is slightly more roundabout. However this is inconsistent with the normal partitioning which requires a key in the case where the partition is determined by a key--in that case you would be manually calling your partitioner in user code. It seems best to me to either use a key or always a partition and since we currently take a key I stuck with that. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1008) Unmap before resizing
[ https://issues.apache.org/jira/browse/KAFKA-1008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13765579#comment-13765579 ] Jun Rao commented on KAFKA-1008: Thanks for the patch. Reviewed patch v7 for 0.8. Looks good overall. Just a couple of minor comments. 70. OffsetIndex: Should forceUnmap() be private? 71. Just a question. Does anyone know if the OS property shows up as windows on cygwin? Sriram, For 1.1, the purpose is probably to save the division calculation, which is a bit expensive. Unmap before resizing - Key: KAFKA-1008 URL: https://issues.apache.org/jira/browse/KAFKA-1008 Project: Kafka Issue Type: Bug Components: core, log Affects Versions: 0.8 Environment: Windows, Linux, Mac OS Reporter: Elizabeth Wei Assignee: Jay Kreps Labels: patch Fix For: 0.8 Attachments: KAFKA-0.8-1008-v7.patch, KAFKA-1008-v6.patch, KAFKA-trunk-1008-v7.patch, unmap-v5.patch Original Estimate: 1h Remaining Estimate: 1h While I was studying how MappedByteBuffer works, I saw a sharing runtime exception on Windows. I applied what I learned to generate a patch which uses an internal open JDK API to solve this problem. Following Jay's advice, I made a helper method called tryUnmap(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-923) Improve controller failover latency
[ https://issues.apache.org/jira/browse/KAFKA-923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-923: -- Resolution: Fixed Fix Version/s: 0.8 Status: Resolved (was: Patch Available) Improve controller failover latency --- Key: KAFKA-923 URL: https://issues.apache.org/jira/browse/KAFKA-923 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8 Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Critical Labels: kafka-0.8 Fix For: 0.8 Attachments: kafka-923-v1.patch, kafka-923-v2.patch During controller failover, we do the following things - 1. Increment controller epoch 2. Initialize controller context 3. Initialize replica state machine 4. Initialize partition state machine During step 2 above, we read the information of all topics and partitions, the replica assignments and leadership information. During step 3 and 4, we re-read some of this information from zookeeper. Since the zookeeper reads are proportional to the number of topics and the reads are serial, it is important to optimize this. The zookeeper reads in steps 3 and 4 are not required. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1020) Remove getAllReplicasOnBroker from KafkaController
[ https://issues.apache.org/jira/browse/KAFKA-1020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1020: --- Fix Version/s: 0.8.1 Remove getAllReplicasOnBroker from KafkaController -- Key: KAFKA-1020 URL: https://issues.apache.org/jira/browse/KAFKA-1020 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.1 Attachments: KAFKA-1020.v1.patch Today KafkaController call getAllReplicasOnBroker on broker failure and new broker start up to get all the replicas that broker is holding (or suppose to hold). This function actually issue a read on each topic's partition znodes. With large number of topic/partitions this could seriously increase the latency of handling broker failure and new broker startup. On the other hand, ControllerContext maintains a partitionReplicaAssignment cache, which is designed to keep the most updated partition replica assignment according to ZK. So instead of reading from ZK, we could just read from the local cache, given that partitionReplicaAssignment is guaranteed to be up-to-date. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-998) Producer should not retry on non-recoverable error codes
[ https://issues.apache.org/jira/browse/KAFKA-998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13765633#comment-13765633 ] Jun Rao commented on KAFKA-998: --- Thanks for the patch. Looks good overall. Just one comment: 10. ErrorMapping.fatalException(): Should we rename it to unrecoverableException? MessageSizeTooLarge doesn't seems like a fatal exception. I am not sure if it's worth patching this in 0.8. The workaround is to reduce the batch size, as well reducing retry times and retry intervals. Producer should not retry on non-recoverable error codes Key: KAFKA-998 URL: https://issues.apache.org/jira/browse/KAFKA-998 Project: Kafka Issue Type: Bug Affects Versions: 0.8, 0.8.1 Reporter: Joel Koshy Assignee: Guozhang Wang Attachments: KAFKA-998.v1.patch Based on a discussion with Guozhang. The producer currently retries on all error codes (including messagesizetoolarge which is pointless to retry on). This can slow down the producer unnecessarily. If at all we want to retry on that error code we would need to retry with a smaller batch size, but that's a separate discussion. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-948) ISR list in LeaderAndISR path not updated for partitions when Broker (which is not leader) is down
[ https://issues.apache.org/jira/browse/KAFKA-948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13766575#comment-13766575 ] Jun Rao commented on KAFKA-948: --- Is this still happening in 0.8? ISR list in LeaderAndISR path not updated for partitions when Broker (which is not leader) is down -- Key: KAFKA-948 URL: https://issues.apache.org/jira/browse/KAFKA-948 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8 Reporter: Dibyendu Bhattacharya Assignee: Neha Narkhede When the broker which is the leader for a partition is down, the ISR list in the LeaderAndISR path is updated. But if the broker , which is not a leader of the partition is down, the ISR list is not getting updated. This is an issues because ISR list contains the stale entry. This issue I found in kafka-0.8.0-beta1-candidate1 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-973) Messages From Producer Not being Partitioned
[ https://issues.apache.org/jira/browse/KAFKA-973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-973. --- Resolution: Not A Problem Messages From Producer Not being Partitioned - Key: KAFKA-973 URL: https://issues.apache.org/jira/browse/KAFKA-973 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Environment: Linux Reporter: Subbu Srinivasan Assignee: Neha Narkhede Labels: newbie I created a two node cluster. 2 zoo keepers 2 brokers 1 topic with replication factor (2) and no of partition 2. my consumer group has two threads 1) From my Java client - I send few messages to the topic. I have set multiple brokers kafka2:9092,kafka1:9092. Only one thread in my consumer always gets the messages. It looks like producer is not partitioning the requests properly. 2) However if I send some sample using the simple console producer, I see multiple threads getting requests and is load balanced. What am I doing wrong in my client? public class KafkaProducer { private final Properties props = new Properties(); private static AtomicLong counter = new AtomicLong(0); kafka.javaapi.producer.ProducerInteger, String producer = null; public KafkaProducer() { props.put(serializer.class, kafka.serializer.StringEncoder); props.put(metadata.broker.list, ConfigurationUtility.getKafkaHost()); producer = new kafka.javaapi.producer.ProducerInteger, String(new ProducerConfig(props)); } public void sendMessage(String msg) throws Exception { producer.send(new KeyedMessageInteger, String(ConfigurationUtility.getTopicName(), msg)); } public static void main(String arg[]) throws Exception { ConfigurationUtility.setKafkaHost(kafka2:9092,kafka1:9092); ConfigurationUtility.setTopicName(dnslog); ConfigurationUtility.setZooKeeperHost(kafka1:2181,kafka2:2181); ConfigurationUtility.setConsumerGroupId(dnslog); for(int i = 0 ; i 2 ; ++i) { (new KafkaProducer()).sendMessage(UUID.randomUUID().toString()); } } } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-972) MetadataRequest returns stale list of brokers
[ https://issues.apache.org/jira/browse/KAFKA-972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-972: -- Fix Version/s: (was: 0.8) Could you describe how to reproduce this? MetadataRequest returns stale list of brokers - Key: KAFKA-972 URL: https://issues.apache.org/jira/browse/KAFKA-972 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Vinicius Carvalho When we issue an metadatarequest towards the cluster, the list of brokers is stale. I mean, even when a broker is down, it's returned back to the client. The following are examples of two invocations one with both brokers online and the second with a broker down: { brokers: [ { nodeId: 0, host: 10.139.245.106, port: 9092, byteLength: 24 }, { nodeId: 1, host: localhost, port: 9093, byteLength: 19 } ], topicMetadata: [ { topicErrorCode: 0, topicName: foozbar, partitions: [ { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 0, leader: 0, byteLength: 26 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 1, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 2, leader: 0, byteLength: 26 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 3, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [ 0 ], partitionErrorCode: 0, partitionId: 4, leader: 0, byteLength: 26 } ], byteLength: 145 } ], responseSize: 200, correlationId: -1000 } { brokers: [ { nodeId: 0, host: 10.139.245.106, port: 9092, byteLength: 24 }, { nodeId: 1, host: localhost, port: 9093, byteLength: 19 } ], topicMetadata: [ { topicErrorCode: 0, topicName: foozbar, partitions: [ { replicas: [ 0 ], isr: [], partitionErrorCode: 5, partitionId: 0, leader: -1, byteLength: 22 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 1, leader: 1, byteLength: 26 }, { replicas: [ 0 ], isr: [], partitionErrorCode: 5, partitionId: 2, leader: -1, byteLength: 22 }, { replicas: [ 1 ], isr: [ 1 ], partitionErrorCode: 0, partitionId: 3, leader: 1, byteLength: 26 }, { replicas: [ 0
[jira] [Commented] (KAFKA-975) Leader not local for partition when partition is leader (kafka.common.NotLeaderForPartitionException)
[ https://issues.apache.org/jira/browse/KAFKA-975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13766567#comment-13766567 ] Jun Rao commented on KAFKA-975: --- Any error in the controller and state-change log? Leader not local for partition when partition is leader (kafka.common.NotLeaderForPartitionException) - Key: KAFKA-975 URL: https://issues.apache.org/jira/browse/KAFKA-975 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8 Environment: centos 6.4 Reporter: Dan Swanson Assignee: Neha Narkhede I have a two server kafka cluster (dev003 and dev004). I am following the example from this URL but using two servers with a single kafka instance instead of using 1 server with two instances.. http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/ Using the following trunk version commit c27c768463a5dc6be113f2e5b3e00bf8d9d9d602 Author: David Arthur mum...@gmail.com Date: Thu Jul 11 15:34:57 2013 -0700 KAFKA-852, remove clientId from Offset{Fetch,Commit}Response. Reviewed by Jay. -- [2013-07-16 10:56:50,279] INFO [Kafka Server 3], started (kafka.server.KafkaServer) -- dan@linux-rr29:~/git-data/kafka-current-src bin/kafka-topics.sh --zookeeper dev003:2181 --create --topic dadj1 --partitions 1 --replication-factor 2 2/dev/null Created topic dadj1. dan@linux-rr29:~/git-data/kafka-current-src --- [2013-07-16 10:56:57,946] INFO [Replica Manager on Broker 3]: Handling LeaderAndIsr request Name:LeaderAndIsrRequest;Version:0;Controller:4;ControllerEpoch:19;CorrelationId:12;ClientId:id_4-host_dev004-port_9092;PartitionState:(dadj1,0) - (LeaderAndIsrInfo:(Leader:3,ISR:3,4,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:3,4);Leaders:id:3,host:dev003,port:9092 (kafka.server.ReplicaManager) [2013-07-16 10:56:57,959] INFO [ReplicaFetcherManager on broker 3] Removing fetcher for partition [dadj1,0] (kafka.server.ReplicaFetcherManager) [2013-07-16 10:57:21,196] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [dadj1,0] failed due to Leader not local for partition [dadj1,0] on broker 3 (kafka.server.KafkaApis) - dan@linux-rr29:~/git-data/kafka-current-src bin/kafka-topics.sh --zookeeper dev003:2181 --describe --topic dadj1 2/dev/null dadj1 configs: partitions: 1 topic: dadj1partition: 0leader: 3 replicas: 3,4 isr: 3,4 dan@linux-rr29:~/git-data/kafka-current-src Dev003 logs show that server is elected as leader and has correct id of 3, zookeeper shows dev003 is leader, but when I try to produce to the topic I get a failure because the server thinks it is not the leader. This occurs regardless of which server (dev003 or dev004) ends up the leader. Here is my config which is the same except for the broker id and host names [root@dev003 kafka-current-src]# grep -v -e '^#' -e '^$' config/server.properties broker.id=3 port=9092 host.name=dev003 num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dir=/opt/kafka/data/8.0/ num.partitions=1 log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.cleanup.interval.mins=1 zookeeper.connect=10.200.8.61:2181,10.200.8.62:2181,10.200.8.63:2181 zookeeper.connection.timeout.ms=100 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/kafka_metrics kafka.csv.metrics.reporter.enabled=false [root@dev003 kafka-current-src]# -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1040) ConsumerConfig and ProducerConfig do work in the Constructor
[ https://issues.apache.org/jira/browse/KAFKA-1040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1040: --- Fix Version/s: 0.8.1 ConsumerConfig and ProducerConfig do work in the Constructor -- Key: KAFKA-1040 URL: https://issues.apache.org/jira/browse/KAFKA-1040 Project: Kafka Issue Type: Improvement Components: config, consumer, producer Affects Versions: 0.8 Environment: Java 1.7 Linux Mint 14 (64bit) Reporter: Sharmarke Aden Assignee: Neha Narkhede Priority: Minor Labels: config Fix For: 0.8.1 It appears that validation of configuration properties is performed in the ConsumerConfig and ProducerConfig constructors. This is generally bad practice as it couples object construction and validation. It also makes it difficult to mock these objects in unit tests. Ideally validation of the configuration properties should be separated from object construction and initiated by those that rely/use these config objects. http://misko.hevery.com/code-reviewers-guide/flaw-constructor-does-real-work/ -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1046) Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x
[ https://issues.apache.org/jira/browse/KAFKA-1046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1046: --- Fix Version/s: 0.8 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x -- Key: KAFKA-1046 URL: https://issues.apache.org/jira/browse/KAFKA-1046 Project: Kafka Issue Type: Improvement Affects Versions: 0.8 Reporter: Christopher Freeman Assignee: Christopher Freeman Fix For: 0.8 Attachments: kafka_2_10_refactor_0.8.patch, kafka_2_10_refactor.patch, Screen Shot 2013-09-09 at 9.34.09 AM.png I refactored the project such that it will compile against Scala 2.10.1. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1036) Unable to rename replication offset checkpoint in windows
[ https://issues.apache.org/jira/browse/KAFKA-1036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13766549#comment-13766549 ] Jun Rao commented on KAFKA-1036: Hmm, not sure how to patch this since we close the writer before renaming the file. Unable to rename replication offset checkpoint in windows - Key: KAFKA-1036 URL: https://issues.apache.org/jira/browse/KAFKA-1036 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: windows Reporter: Timothy Chen Priority: Critical Labels: windows Although there was a fix for checkpoint file renaming in windows that tries to delete the existing checkpoint file if renamed failed, I'm still seeing renaming errors on windows even though the destination file doesn't exist. A bit investigation shows that it wasn't able to rename the file since the kafka jvm still holds a fie lock on the tmp file and wasn't able to rename it. Attaching a patch that calls a explict writer.close so it can release the lock and can able to rename it. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1044) change log4j to slf4j
[ https://issues.apache.org/jira/browse/KAFKA-1044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1044: --- Fix Version/s: 0.9 change log4j to slf4j -- Key: KAFKA-1044 URL: https://issues.apache.org/jira/browse/KAFKA-1044 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Reporter: sjk Assignee: Jay Kreps Fix For: 0.9 can u chanage the log4j to slf4j, in my project, i use logback, it's conflict with log4j. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1046) Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x
[ https://issues.apache.org/jira/browse/KAFKA-1046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1046: --- Status: Patch Available (was: Open) Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x -- Key: KAFKA-1046 URL: https://issues.apache.org/jira/browse/KAFKA-1046 Project: Kafka Issue Type: Improvement Affects Versions: 0.8 Reporter: Christopher Freeman Assignee: Christopher Freeman Attachments: kafka_2_10_refactor_0.8.patch, kafka_2_10_refactor.patch, Screen Shot 2013-09-09 at 9.34.09 AM.png I refactored the project such that it will compile against Scala 2.10.1. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-956) High-level consumer fails to check topic metadata response for errors
[ https://issues.apache.org/jira/browse/KAFKA-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13766581#comment-13766581 ] Jun Rao commented on KAFKA-956: --- This can probably be fixed as part of kafka-1030. High-level consumer fails to check topic metadata response for errors - Key: KAFKA-956 URL: https://issues.apache.org/jira/browse/KAFKA-956 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Sam Meder Assignee: Neha Narkhede Priority: Blocker Fix For: 0.8 Attachments: consumer_metadata_fetch.patch In our environment we noticed that consumers would sometimes hang when started too close to starting the Kafka server. I tracked this down and it seems to be related to some code in rebalance (ZookeeperConsumerConnector.scala). In particular the following code seems problematic: val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId, config.socketTimeoutMs, correlationId.getAndIncrement).topicsMetadata val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] topicsMetadata.foreach(m = { val topic = m.topic val partitions = m.partitionsMetadata.map(m1 = m1.partitionId) partitionsPerTopicMap.put(topic, partitions) }) The response is never checked for error, so may not actually contain any partition info! Rebalance goes its merry way, but doesn't know about any partitions so never assigns them... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1027) Add documentation for system tests
[ https://issues.apache.org/jira/browse/KAFKA-1027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1027: --- Fix Version/s: (was: 0.8) 0.8.1 Add documentation for system tests -- Key: KAFKA-1027 URL: https://issues.apache.org/jira/browse/KAFKA-1027 Project: Kafka Issue Type: Sub-task Components: website Reporter: Tejas Patil Priority: Minor Labels: documentation Fix For: 0.8.1 Create a document describing following things: - Overview of Kafka system test framework - how to run the entire system test suite - how to run a specific system test - how to interpret the system test results - how to troubleshoot a failed test case - how to add new test module - how to add new test case -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-946) Kafka Hadoop Consumer fails when verifying message checksum
[ https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-946: -- Fix Version/s: 0.8 Kafka Hadoop Consumer fails when verifying message checksum --- Key: KAFKA-946 URL: https://issues.apache.org/jira/browse/KAFKA-946 Project: Kafka Issue Type: Bug Components: contrib Affects Versions: 0.8 Reporter: Sam Meder Priority: Critical Fix For: 0.8 Attachments: hadoop_consumer.patch The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext: protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException { if (_messageIt != null _messageIt.hasNext()) { MessageAndOffset messageAndOffset = _messageIt.next(); ByteBuffer buf = messageAndOffset.message().payload(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); value.set(bytes, 0, origSize); key.set(_index, _offset, messageAndOffset.message().checksum()); _offset = messageAndOffset.nextOffset(); //increase offset _count ++; //increase count return true; } else return false; } Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper: @Override public void map(KafkaETLKey key, BytesWritable val, OutputCollectorLongWritable, Text collector, Reporter reporter) throws IOException { byte[] bytes = KafkaETLUtils.getBytes(val); //check the checksum of message Message message = new Message(bytes); long checksum = key.getChecksum(); if (checksum != message.checksum()) throw new IOException (Invalid message checksum + message.checksum() + . Expected + key + .); the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1041) Number of file handles increases indefinitely in producer if broker host is unresolvable
[ https://issues.apache.org/jira/browse/KAFKA-1041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1041: --- Fix Version/s: 0.8.1 Moving to 0.8.1 since it's not very critical Number of file handles increases indefinitely in producer if broker host is unresolvable Key: KAFKA-1041 URL: https://issues.apache.org/jira/browse/KAFKA-1041 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8 Environment: *unix* Reporter: Rajasekar Elango Assignee: Neha Narkhede Labels: features Fix For: 0.8.1 We found a issue that if broker host is un resolvable, the number of file handle keep increasing for every message we produce and eventually it uses up all available files handles in operating system. If broker itself is not running and broker host name is resolvable, open file handles count stays flat. lsof output shows number of these open file handles continue to grow for every message we produce. java 19631relango 81u sock0,6 0t0 196966526 can't identify protocol I can easily reproduce this with console producer, If I run console producer with right hostname and if broker is not running, the console producer will exit after three tries. But If I run console producer with unresolvable broker, it throws below exception and continues to wait for user input, every time I enter new message, it opens socket and file handle count keeps increasing.. Here is Exception in producer ERROR fetching topic metadata for topics [Set(test-1378245487417)] from broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(test-1378245487417)] from broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.Utils$.swallow(Utils.scala:186) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:30) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:487) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:59) at kafka.producer.SyncProducer.connect(SyncProducer.scala:151) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:166) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73) at kafka.producer.SyncProducer.send(SyncProducer.scala:117) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:37) ... 12 more -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1038) fetch response should use empty messageset instead of null when handling errors
[ https://issues.apache.org/jira/browse/KAFKA-1038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1038: --- Fix Version/s: 0.8 fetch response should use empty messageset instead of null when handling errors --- Key: KAFKA-1038 URL: https://issues.apache.org/jira/browse/KAFKA-1038 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka-1038.patch Saw the following exception: Exception when handling request (kafka.server.KafkaRequestHandler) java.lang.NullPointerException at kafka.api.FetchResponsePartitionData.init(FetchResponse.scala:46) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.HashMap.map(HashMap.scala:38) at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) at kafka.server.KafkaApis.handle(KafkaApis.scala:78) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-1038) fetch response should use empty messageset instead of null when handling errors
[ https://issues.apache.org/jira/browse/KAFKA-1038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1038: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the review. Committed to 0.8. fetch response should use empty messageset instead of null when handling errors --- Key: KAFKA-1038 URL: https://issues.apache.org/jira/browse/KAFKA-1038 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka-1038.patch Saw the following exception: Exception when handling request (kafka.server.KafkaRequestHandler) java.lang.NullPointerException at kafka.api.FetchResponsePartitionData.init(FetchResponse.scala:46) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.HashMap.map(HashMap.scala:38) at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) at kafka.server.KafkaApis.handle(KafkaApis.scala:78) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-1038) fetch response should use empty messageset instead of null when handling errors
[ https://issues.apache.org/jira/browse/KAFKA-1038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-1038. -- fetch response should use empty messageset instead of null when handling errors --- Key: KAFKA-1038 URL: https://issues.apache.org/jira/browse/KAFKA-1038 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka-1038.patch Saw the following exception: Exception when handling request (kafka.server.KafkaRequestHandler) java.lang.NullPointerException at kafka.api.FetchResponsePartitionData.init(FetchResponse.scala:46) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:158) at kafka.api.FetchRequest$$anonfun$2.apply(FetchRequest.scala:156) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.HashMap.map(HashMap.scala:38) at kafka.api.FetchRequest.handleError(FetchRequest.scala:156) at kafka.server.KafkaApis.handle(KafkaApis.scala:78) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1008) Unmap before resizing
[ https://issues.apache.org/jira/browse/KAFKA-1008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13774698#comment-13774698 ] Jun Rao commented on KAFKA-1008: Sorry, I missed that comment. +1 on v8. Unmap before resizing - Key: KAFKA-1008 URL: https://issues.apache.org/jira/browse/KAFKA-1008 Project: Kafka Issue Type: Bug Components: core, log Affects Versions: 0.8 Environment: Windows, Linux, Mac OS Reporter: Elizabeth Wei Assignee: Jay Kreps Labels: patch Fix For: 0.8 Attachments: KAFKA-0.8-1008-v7.patch, KAFKA-0.8-1008-v8.patch, KAFKA-1008-v6.patch, KAFKA-trunk-1008-v7.patch, unmap-v5.patch Original Estimate: 1h Remaining Estimate: 1h While I was studying how MappedByteBuffer works, I saw a sharing runtime exception on Windows. I applied what I learned to generate a patch which uses an internal open JDK API to solve this problem. Following Jay's advice, I made a helper method called tryUnmap(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-975) Leader not local for partition when partition is leader (kafka.common.NotLeaderForPartitionException)
[ https://issues.apache.org/jira/browse/KAFKA-975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-975. - Leader not local for partition when partition is leader (kafka.common.NotLeaderForPartitionException) - Key: KAFKA-975 URL: https://issues.apache.org/jira/browse/KAFKA-975 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8 Environment: centos 6.4 Reporter: Dan Swanson I have a two server kafka cluster (dev003 and dev004). I am following the example from this URL but using two servers with a single kafka instance instead of using 1 server with two instances.. http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/ Using the following trunk version commit c27c768463a5dc6be113f2e5b3e00bf8d9d9d602 Author: David Arthur mum...@gmail.com Date: Thu Jul 11 15:34:57 2013 -0700 KAFKA-852, remove clientId from Offset{Fetch,Commit}Response. Reviewed by Jay. -- [2013-07-16 10:56:50,279] INFO [Kafka Server 3], started (kafka.server.KafkaServer) -- dan@linux-rr29:~/git-data/kafka-current-src bin/kafka-topics.sh --zookeeper dev003:2181 --create --topic dadj1 --partitions 1 --replication-factor 2 2/dev/null Created topic dadj1. dan@linux-rr29:~/git-data/kafka-current-src --- [2013-07-16 10:56:57,946] INFO [Replica Manager on Broker 3]: Handling LeaderAndIsr request Name:LeaderAndIsrRequest;Version:0;Controller:4;ControllerEpoch:19;CorrelationId:12;ClientId:id_4-host_dev004-port_9092;PartitionState:(dadj1,0) - (LeaderAndIsrInfo:(Leader:3,ISR:3,4,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:3,4);Leaders:id:3,host:dev003,port:9092 (kafka.server.ReplicaManager) [2013-07-16 10:56:57,959] INFO [ReplicaFetcherManager on broker 3] Removing fetcher for partition [dadj1,0] (kafka.server.ReplicaFetcherManager) [2013-07-16 10:57:21,196] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [dadj1,0] failed due to Leader not local for partition [dadj1,0] on broker 3 (kafka.server.KafkaApis) - dan@linux-rr29:~/git-data/kafka-current-src bin/kafka-topics.sh --zookeeper dev003:2181 --describe --topic dadj1 2/dev/null dadj1 configs: partitions: 1 topic: dadj1partition: 0leader: 3 replicas: 3,4 isr: 3,4 dan@linux-rr29:~/git-data/kafka-current-src Dev003 logs show that server is elected as leader and has correct id of 3, zookeeper shows dev003 is leader, but when I try to produce to the topic I get a failure because the server thinks it is not the leader. This occurs regardless of which server (dev003 or dev004) ends up the leader. Here is my config which is the same except for the broker id and host names [root@dev003 kafka-current-src]# grep -v -e '^#' -e '^$' config/server.properties broker.id=3 port=9092 host.name=dev003 num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dir=/opt/kafka/data/8.0/ num.partitions=1 log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.cleanup.interval.mins=1 zookeeper.connect=10.200.8.61:2181,10.200.8.62:2181,10.200.8.63:2181 zookeeper.connection.timeout.ms=100 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/kafka_metrics kafka.csv.metrics.reporter.enabled=false [root@dev003 kafka-current-src]# -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-975) Leader not local for partition when partition is leader (kafka.common.NotLeaderForPartitionException)
[ https://issues.apache.org/jira/browse/KAFKA-975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-975. --- Resolution: Not A Problem Assignee: (was: Neha Narkhede) Carl, thanks for the update. Could you mind adding an FAQ in https://cwiki.apache.org/confluence/display/KAFKA/FAQ ? Leader not local for partition when partition is leader (kafka.common.NotLeaderForPartitionException) - Key: KAFKA-975 URL: https://issues.apache.org/jira/browse/KAFKA-975 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8 Environment: centos 6.4 Reporter: Dan Swanson I have a two server kafka cluster (dev003 and dev004). I am following the example from this URL but using two servers with a single kafka instance instead of using 1 server with two instances.. http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/ Using the following trunk version commit c27c768463a5dc6be113f2e5b3e00bf8d9d9d602 Author: David Arthur mum...@gmail.com Date: Thu Jul 11 15:34:57 2013 -0700 KAFKA-852, remove clientId from Offset{Fetch,Commit}Response. Reviewed by Jay. -- [2013-07-16 10:56:50,279] INFO [Kafka Server 3], started (kafka.server.KafkaServer) -- dan@linux-rr29:~/git-data/kafka-current-src bin/kafka-topics.sh --zookeeper dev003:2181 --create --topic dadj1 --partitions 1 --replication-factor 2 2/dev/null Created topic dadj1. dan@linux-rr29:~/git-data/kafka-current-src --- [2013-07-16 10:56:57,946] INFO [Replica Manager on Broker 3]: Handling LeaderAndIsr request Name:LeaderAndIsrRequest;Version:0;Controller:4;ControllerEpoch:19;CorrelationId:12;ClientId:id_4-host_dev004-port_9092;PartitionState:(dadj1,0) - (LeaderAndIsrInfo:(Leader:3,ISR:3,4,LeaderEpoch:0,ControllerEpoch:19),ReplicationFactor:2),AllReplicas:3,4);Leaders:id:3,host:dev003,port:9092 (kafka.server.ReplicaManager) [2013-07-16 10:56:57,959] INFO [ReplicaFetcherManager on broker 3] Removing fetcher for partition [dadj1,0] (kafka.server.ReplicaFetcherManager) [2013-07-16 10:57:21,196] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [dadj1,0] failed due to Leader not local for partition [dadj1,0] on broker 3 (kafka.server.KafkaApis) - dan@linux-rr29:~/git-data/kafka-current-src bin/kafka-topics.sh --zookeeper dev003:2181 --describe --topic dadj1 2/dev/null dadj1 configs: partitions: 1 topic: dadj1partition: 0leader: 3 replicas: 3,4 isr: 3,4 dan@linux-rr29:~/git-data/kafka-current-src Dev003 logs show that server is elected as leader and has correct id of 3, zookeeper shows dev003 is leader, but when I try to produce to the topic I get a failure because the server thinks it is not the leader. This occurs regardless of which server (dev003 or dev004) ends up the leader. Here is my config which is the same except for the broker id and host names [root@dev003 kafka-current-src]# grep -v -e '^#' -e '^$' config/server.properties broker.id=3 port=9092 host.name=dev003 num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dir=/opt/kafka/data/8.0/ num.partitions=1 log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.cleanup.interval.mins=1 zookeeper.connect=10.200.8.61:2181,10.200.8.62:2181,10.200.8.63:2181 zookeeper.connection.timeout.ms=100 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/kafka_metrics kafka.csv.metrics.reporter.enabled=false [root@dev003 kafka-current-src]# -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-946) Kafka Hadoop Consumer fails when verifying message checksum
[ https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13774707#comment-13774707 ] Jun Rao commented on KAFKA-946: --- Sorry for not looking at this earlier. I think that we can include the fix in 0.8 since it's simple enough. It doesn't apply to 0.8 though. Could you rebase? Kafka Hadoop Consumer fails when verifying message checksum --- Key: KAFKA-946 URL: https://issues.apache.org/jira/browse/KAFKA-946 Project: Kafka Issue Type: Bug Components: contrib Affects Versions: 0.8 Reporter: Sam Meder Priority: Critical Fix For: 0.8 Attachments: hadoop_consumer.patch The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext: protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException { if (_messageIt != null _messageIt.hasNext()) { MessageAndOffset messageAndOffset = _messageIt.next(); ByteBuffer buf = messageAndOffset.message().payload(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); value.set(bytes, 0, origSize); key.set(_index, _offset, messageAndOffset.message().checksum()); _offset = messageAndOffset.nextOffset(); //increase offset _count ++; //increase count return true; } else return false; } Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper: @Override public void map(KafkaETLKey key, BytesWritable val, OutputCollectorLongWritable, Text collector, Reporter reporter) throws IOException { byte[] bytes = KafkaETLUtils.getBytes(val); //check the checksum of message Message message = new Message(bytes); long checksum = key.getChecksum(); if (checksum != message.checksum()) throw new IOException (Invalid message checksum + message.checksum() + . Expected + key + .); the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-946) Kafka Hadoop Consumer fails when verifying message checksum
[ https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-946: -- Resolution: Fixed Assignee: Sam Meder Status: Resolved (was: Patch Available) Thanks for the rebased patch. +1 and committed to 0.8. Kafka Hadoop Consumer fails when verifying message checksum --- Key: KAFKA-946 URL: https://issues.apache.org/jira/browse/KAFKA-946 Project: Kafka Issue Type: Bug Components: contrib Affects Versions: 0.8 Reporter: Sam Meder Assignee: Sam Meder Priority: Critical Fix For: 0.8 Attachments: hadoop_consumer_1.patch The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext: protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException { if (_messageIt != null _messageIt.hasNext()) { MessageAndOffset messageAndOffset = _messageIt.next(); ByteBuffer buf = messageAndOffset.message().payload(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); value.set(bytes, 0, origSize); key.set(_index, _offset, messageAndOffset.message().checksum()); _offset = messageAndOffset.nextOffset(); //increase offset _count ++; //increase count return true; } else return false; } Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper: @Override public void map(KafkaETLKey key, BytesWritable val, OutputCollectorLongWritable, Text collector, Reporter reporter) throws IOException { byte[] bytes = KafkaETLUtils.getBytes(val); //check the checksum of message Message message = new Message(bytes); long checksum = key.getChecksum(); if (checksum != message.checksum()) throw new IOException (Invalid message checksum + message.checksum() + . Expected + key + .); the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-946) Kafka Hadoop Consumer fails when verifying message checksum
[ https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-946. - Kafka Hadoop Consumer fails when verifying message checksum --- Key: KAFKA-946 URL: https://issues.apache.org/jira/browse/KAFKA-946 Project: Kafka Issue Type: Bug Components: contrib Affects Versions: 0.8 Reporter: Sam Meder Assignee: Sam Meder Priority: Critical Fix For: 0.8 Attachments: hadoop_consumer_1.patch The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext: protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException { if (_messageIt != null _messageIt.hasNext()) { MessageAndOffset messageAndOffset = _messageIt.next(); ByteBuffer buf = messageAndOffset.message().payload(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); value.set(bytes, 0, origSize); key.set(_index, _offset, messageAndOffset.message().checksum()); _offset = messageAndOffset.nextOffset(); //increase offset _count ++; //increase count return true; } else return false; } Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper: @Override public void map(KafkaETLKey key, BytesWritable val, OutputCollectorLongWritable, Text collector, Reporter reporter) throws IOException { byte[] bytes = KafkaETLUtils.getBytes(val); //check the checksum of message Message message = new Message(bytes); long checksum = key.getChecksum(); if (checksum != message.checksum()) throw new IOException (Invalid message checksum + message.checksum() + . Expected + key + .); the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1065) IO exception on windows when high throughput of messages
[ https://issues.apache.org/jira/browse/KAFKA-1065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13776426#comment-13776426 ] Jun Rao commented on KAFKA-1065: Could you try the patch in https://issues.apache.org/jira/browse/KAFKA-1008? I think it's fixing the same issue on windows. IO exception on windows when high throughput of messages Key: KAFKA-1065 URL: https://issues.apache.org/jira/browse/KAFKA-1065 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Environment: Windows 7 64bit Reporter: Carl Austin Assignee: Jun Rao When a large number of messages are sent per second to a broker on Windows a memory mapping exception occurs and kills Kafka. The exception follows : {code}kafka.common.KafkaStorageException: I/O exception in append to log 'test-0' at kafka.log.Log.append(Log.scala:349) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:340) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:236) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:228) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.HashMap.map(HashMap.scala:39) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:228) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:162) at kafka.server.KafkaApis.handle(KafkaApis.scala:66) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex.liftedTree2$1(OffsetIndex.scala:263) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:262) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:247) at kafka.log.Log.rollToOffset(Log.scala:518) at kafka.log.Log.roll(Log.scala:502) at kafka.log.Log.maybeRoll(Log.scala:484) at kafka.log.Log.append(Log.scala:297) ... 19 more{code} This seems to have been mentioned in the past on the Kafka mail list and is an issue related to http://bugs.sun.com/view_bug.do?bug_id=4724038. Unfortunately this means that we cannot use Kafka on Windows. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-998) Producer should not retry on non-recoverable error codes
[ https://issues.apache.org/jira/browse/KAFKA-998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13778883#comment-13778883 ] Jun Rao commented on KAFKA-998: --- Jason, This patch just won't retry sending the data when hitting a MessageTooLargeException. It doesn't really address you main concern, which is the caller doesn't know the real cause of the failure. Addressing this issue completely will need some more thoughts in the producer logic and the changes required may be non-trivial. So, I am not sure if we should do this in 0.8. Producer should not retry on non-recoverable error codes Key: KAFKA-998 URL: https://issues.apache.org/jira/browse/KAFKA-998 Project: Kafka Issue Type: Bug Affects Versions: 0.8, 0.8.1 Reporter: Joel Koshy Assignee: Guozhang Wang Attachments: KAFKA-998.v1.patch Based on a discussion with Guozhang. The producer currently retries on all error codes (including messagesizetoolarge which is pointless to retry on). This can slow down the producer unnecessarily. If at all we want to retry on that error code we would need to retry with a smaller batch size, but that's a separate discussion. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-998) Producer should not retry on non-recoverable error codes
[ https://issues.apache.org/jira/browse/KAFKA-998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13779636#comment-13779636 ] Jun Rao commented on KAFKA-998: --- My feeling is that it may not be very easy to do a quick fix. Currently, the cause exceptions are eaten at several places just so that we can pass back unsuccessfully sent messages. Producer should not retry on non-recoverable error codes Key: KAFKA-998 URL: https://issues.apache.org/jira/browse/KAFKA-998 Project: Kafka Issue Type: Bug Affects Versions: 0.8, 0.8.1 Reporter: Joel Koshy Assignee: Guozhang Wang Attachments: KAFKA-998.v1.patch Based on a discussion with Guozhang. The producer currently retries on all error codes (including messagesizetoolarge which is pointless to retry on). This can slow down the producer unnecessarily. If at all we want to retry on that error code we would need to retry with a smaller batch size, but that's a separate discussion. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-1068) OfflinePartitionCount metrics may be incorrect after the controller failover
Jun Rao created KAFKA-1068: -- Summary: OfflinePartitionCount metrics may be incorrect after the controller failover Key: KAFKA-1068 URL: https://issues.apache.org/jira/browse/KAFKA-1068 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1068) OfflinePartitionCount metrics may be incorrect after the controller failover
[ https://issues.apache.org/jira/browse/KAFKA-1068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1068: --- Attachment: KAFKA-1068.patch OfflinePartitionCount metrics may be incorrect after the controller failover Key: KAFKA-1068 URL: https://issues.apache.org/jira/browse/KAFKA-1068 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1068.patch -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1068) OfflinePartitionCount metrics may be incorrect after the controller failover
[ https://issues.apache.org/jira/browse/KAFKA-1068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13781530#comment-13781530 ] Jun Rao commented on KAFKA-1068: Created reviewboard https://reviews.apache.org/r/14399/ OfflinePartitionCount metrics may be incorrect after the controller failover Key: KAFKA-1068 URL: https://issues.apache.org/jira/browse/KAFKA-1068 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1068.patch -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1068) OfflinePartitionCount metrics may be incorrect after the controller failover
[ https://issues.apache.org/jira/browse/KAFKA-1068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1068: --- Status: Patch Available (was: Open) OfflinePartitionCount metrics may be incorrect after the controller failover Key: KAFKA-1068 URL: https://issues.apache.org/jira/browse/KAFKA-1068 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1068.patch -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Resolved] (KAFKA-1065) IO exception on windows when high throughput of messages
[ https://issues.apache.org/jira/browse/KAFKA-1065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1065. Resolution: Duplicate Close this since it duplicates kafka-1008. IO exception on windows when high throughput of messages Key: KAFKA-1065 URL: https://issues.apache.org/jira/browse/KAFKA-1065 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Environment: Windows 7 64bit Reporter: Carl Austin Assignee: Jun Rao When a large number of messages are sent per second to a broker on Windows a memory mapping exception occurs and kills Kafka. The exception follows : {code}kafka.common.KafkaStorageException: I/O exception in append to log 'test-0' at kafka.log.Log.append(Log.scala:349) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:340) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:236) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:228) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.HashMap.map(HashMap.scala:39) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:228) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:162) at kafka.server.KafkaApis.handle(KafkaApis.scala:66) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex.liftedTree2$1(OffsetIndex.scala:263) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:262) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:247) at kafka.log.Log.rollToOffset(Log.scala:518) at kafka.log.Log.roll(Log.scala:502) at kafka.log.Log.maybeRoll(Log.scala:484) at kafka.log.Log.append(Log.scala:297) ... 19 more{code} This seems to have been mentioned in the past on the Kafka mail list and is an issue related to http://bugs.sun.com/view_bug.do?bug_id=4724038. Unfortunately this means that we cannot use Kafka on Windows. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Created] (KAFKA-1071) The random partition selected in DefaultEventHandler is not random across producer instances
Jun Rao created KAFKA-1071: -- Summary: The random partition selected in DefaultEventHandler is not random across producer instances Key: KAFKA-1071 URL: https://issues.apache.org/jira/browse/KAFKA-1071 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Guozhang Wang Fix For: 0.8 In DefaultEventHandler, partitionCounter is initialized to 0. If multiple producers start at about the same time, they likely will always select the same partition. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1071) The random partition selected in DefaultEventHandler is not random across producer instances
[ https://issues.apache.org/jira/browse/KAFKA-1071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13784816#comment-13784816 ] Jun Rao commented on KAFKA-1071: Thanks for the patch. Would it be better to just get a random int in getPartition()? In the current approach, if the multiple producer instances somehow get into the lock-step mode, they will never get out of that mode, if partitions are always available. The random partition selected in DefaultEventHandler is not random across producer instances Key: KAFKA-1071 URL: https://issues.apache.org/jira/browse/KAFKA-1071 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Guozhang Wang Fix For: 0.8 Attachments: KAFKA-1071.v1.patch In DefaultEventHandler, partitionCounter is initialized to 0. If multiple producers start at about the same time, they likely will always select the same partition. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1071) The random partition selected in DefaultEventHandler is not random across producer instances
[ https://issues.apache.org/jira/browse/KAFKA-1071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1071: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for patch v2. +1. Committed to 0.8. The random partition selected in DefaultEventHandler is not random across producer instances Key: KAFKA-1071 URL: https://issues.apache.org/jira/browse/KAFKA-1071 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Guozhang Wang Fix For: 0.8 Attachments: KAFKA-1071.v1.patch, KAFKA-1071.v2.patch In DefaultEventHandler, partitionCounter is initialized to 0. If multiple producers start at about the same time, they likely will always select the same partition. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-881) Kafka broker not respecting log.roll.hours
[ https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13786295#comment-13786295 ] Jun Rao commented on KAFKA-881: --- Thanks for the patch. I am not sure if the patch works though. In log.append(), if the system time passes the rolling interval just after the maybeRoll() call, but before segment.append(). The will move the lastAppend time to the next rolling interval. So the next append won't trigger log rolling. Kafka broker not respecting log.roll.hours -- Key: KAFKA-881 URL: https://issues.apache.org/jira/browse/KAFKA-881 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.7.2 Reporter: Dan F Assignee: Jay Kreps Attachments: kafka-roll-0.8.patch, kafka-roll.again.patch, kafka_roll.patch We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs would be rolled every hour, or more. Only, sometimes logs that are many hours (sometimes days) old have more data added to them. This perturbs our systems for reasons I won't get in to. I don't know Scala or Kafka well, but I have proposal for why this might happen: upon restart, a broker forgets when its log files have been appended to (firstAppendTime). Then a potentially infinite amount of time later, the restarted broker receives another message for the particular (topic, partition), and starts the clock again. It will then roll over that log after an hour. https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala says: /* the maximum time before a new log segment is rolled out */ val logRollHours = Utils.getIntInRange(props, log.roll.hours, 24*7, (1, Int.MaxValue)) https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala has maybeRoll, which needs segment.firstAppendTime defined. It also has updateFirstAppendTime() which says if it's empty, then set it. If my hypothesis is correct about why it is happening, here is a case where rolling is longer than an hour, even on a high volume topic: - write to a topic for 20 minutes - restart the broker - wait for 5 days - write to a topic for 20 minutes - restart the broker - write to a topic for an hour The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long as you want. Proposed solution: The very easiest thing to do would be to have Kafka re-initialized firstAppendTime with the file creation time. Unfortunately, there is no file creation time in UNIX. There is ctime, change time, updated when a file's inode information is changed. One solution is to embed the firstAppendTime in the filename (say, seconds since epoch). Then when you open it you could reset firstAppendTime to exactly what it really was. This ignores clock drift or resetting. One could set firstAppendTime to min(filename-based time, current time). A second solution is to make the Kafka log roll over at specific times, regardless of when the file was created. Conceptually, time can be divided into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, when firstAppendTime is empty, compute the next rollover time (say, next = (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file mtime (last modified) is before the current rollover window ( (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll over when you cross next, and reset next. A third solution (not perfect, but an approximation at least) would be to not to write to a segment if firstAppendTime is not defined and the timestamp on the file is more than log.roll.hours old. There are probably other solutions. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Created] (KAFKA-1073) CheckReassignmentStatus is broken
Jun Rao created KAFKA-1073: -- Summary: CheckReassignmentStatus is broken Key: KAFKA-1073 URL: https://issues.apache.org/jira/browse/KAFKA-1073 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 CheckReassignmentStatus is supposed to take the output from ReassignPartitionsCommand as the input. However, the output from ReassignPartitionsCommand is not a valid json. It's also not clear how to prepare the input to CheckReassignmentStatus. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1073) CheckReassignmentStatus is broken
[ https://issues.apache.org/jira/browse/KAFKA-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13786649#comment-13786649 ] Jun Rao commented on KAFKA-1073: Created reviewboard https://reviews.apache.org/r/14496/ CheckReassignmentStatus is broken - Key: KAFKA-1073 URL: https://issues.apache.org/jira/browse/KAFKA-1073 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1073.patch CheckReassignmentStatus is supposed to take the output from ReassignPartitionsCommand as the input. However, the output from ReassignPartitionsCommand is not a valid json. It's also not clear how to prepare the input to CheckReassignmentStatus. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1073) CheckReassignmentStatus is broken
[ https://issues.apache.org/jira/browse/KAFKA-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1073: --- Attachment: KAFKA-1073.patch CheckReassignmentStatus is broken - Key: KAFKA-1073 URL: https://issues.apache.org/jira/browse/KAFKA-1073 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1073.patch CheckReassignmentStatus is supposed to take the output from ReassignPartitionsCommand as the input. However, the output from ReassignPartitionsCommand is not a valid json. It's also not clear how to prepare the input to CheckReassignmentStatus. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1073) CheckReassignmentStatus is broken
[ https://issues.apache.org/jira/browse/KAFKA-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1073: --- Status: Patch Available (was: Open) CheckReassignmentStatus is broken - Key: KAFKA-1073 URL: https://issues.apache.org/jira/browse/KAFKA-1073 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1073.patch CheckReassignmentStatus is supposed to take the output from ReassignPartitionsCommand as the input. However, the output from ReassignPartitionsCommand is not a valid json. It's also not clear how to prepare the input to CheckReassignmentStatus. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Created] (KAFKA-1074) Reassign partitions should delete the old replicas from disk
Jun Rao created KAFKA-1074: -- Summary: Reassign partitions should delete the old replicas from disk Key: KAFKA-1074 URL: https://issues.apache.org/jira/browse/KAFKA-1074 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Fix For: 0.8.1 Currently, after reassigning replicas to other brokers, the old replicas are not removed from disk and have to be deleted manually. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1073) CheckReassignmentStatus is broken
[ https://issues.apache.org/jira/browse/KAFKA-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1073: --- Attachment: KAFKA-1073_2013-10-05_11:02:45.patch CheckReassignmentStatus is broken - Key: KAFKA-1073 URL: https://issues.apache.org/jira/browse/KAFKA-1073 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1073_2013-10-05_11:02:45.patch, KAFKA-1073.patch CheckReassignmentStatus is supposed to take the output from ReassignPartitionsCommand as the input. However, the output from ReassignPartitionsCommand is not a valid json. It's also not clear how to prepare the input to CheckReassignmentStatus. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1076) system tests in 0.8 are broken due to wrong log4j config
[ https://issues.apache.org/jira/browse/KAFKA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1076: --- Status: Patch Available (was: Open) system tests in 0.8 are broken due to wrong log4j config Key: KAFKA-1076 URL: https://issues.apache.org/jira/browse/KAFKA-1076 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1076.patch -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-1076) system tests in 0.8 are broken due to wrong log4j config
[ https://issues.apache.org/jira/browse/KAFKA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13787287#comment-13787287 ] Jun Rao commented on KAFKA-1076: Created reviewboard https://reviews.apache.org/r/14511/ system tests in 0.8 are broken due to wrong log4j config Key: KAFKA-1076 URL: https://issues.apache.org/jira/browse/KAFKA-1076 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1076.patch -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1076) system tests in 0.8 are broken due to wrong log4j config
[ https://issues.apache.org/jira/browse/KAFKA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1076: --- Attachment: KAFKA-1076.patch system tests in 0.8 are broken due to wrong log4j config Key: KAFKA-1076 URL: https://issues.apache.org/jira/browse/KAFKA-1076 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1076.patch -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Commented] (KAFKA-881) Kafka broker not respecting log.roll.hours
[ https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13787655#comment-13787655 ] Jun Rao commented on KAFKA-881: --- How about the following: In log, maintaining an expectedRollingEpoc, which is computed by the time of the first append or the last modified time during restart (if the last segment is not empty) divided by rollIntervalMs. Once computed, expectedRollingEpoc doesn't change until the log rolls. We can then compare expectedRollingEpoc with current time divided by rollIntervalMs if determine if a log needs to be rolled. Kafka broker not respecting log.roll.hours -- Key: KAFKA-881 URL: https://issues.apache.org/jira/browse/KAFKA-881 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.7.2 Reporter: Dan F Assignee: Jay Kreps Attachments: kafka-roll-0.8.patch, kafka-roll.again.patch, kafka_roll.patch We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs would be rolled every hour, or more. Only, sometimes logs that are many hours (sometimes days) old have more data added to them. This perturbs our systems for reasons I won't get in to. I don't know Scala or Kafka well, but I have proposal for why this might happen: upon restart, a broker forgets when its log files have been appended to (firstAppendTime). Then a potentially infinite amount of time later, the restarted broker receives another message for the particular (topic, partition), and starts the clock again. It will then roll over that log after an hour. https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala says: /* the maximum time before a new log segment is rolled out */ val logRollHours = Utils.getIntInRange(props, log.roll.hours, 24*7, (1, Int.MaxValue)) https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala has maybeRoll, which needs segment.firstAppendTime defined. It also has updateFirstAppendTime() which says if it's empty, then set it. If my hypothesis is correct about why it is happening, here is a case where rolling is longer than an hour, even on a high volume topic: - write to a topic for 20 minutes - restart the broker - wait for 5 days - write to a topic for 20 minutes - restart the broker - write to a topic for an hour The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long as you want. Proposed solution: The very easiest thing to do would be to have Kafka re-initialized firstAppendTime with the file creation time. Unfortunately, there is no file creation time in UNIX. There is ctime, change time, updated when a file's inode information is changed. One solution is to embed the firstAppendTime in the filename (say, seconds since epoch). Then when you open it you could reset firstAppendTime to exactly what it really was. This ignores clock drift or resetting. One could set firstAppendTime to min(filename-based time, current time). A second solution is to make the Kafka log roll over at specific times, regardless of when the file was created. Conceptually, time can be divided into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, when firstAppendTime is empty, compute the next rollover time (say, next = (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file mtime (last modified) is before the current rollover window ( (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll over when you cross next, and reset next. A third solution (not perfect, but an approximation at least) would be to not to write to a segment if firstAppendTime is not defined and the timestamp on the file is more than log.roll.hours old. There are probably other solutions. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1076) system tests in 0.8 are broken due to wrong log4j config
[ https://issues.apache.org/jira/browse/KAFKA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1076: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the review. Committed to 0.8. system tests in 0.8 are broken due to wrong log4j config Key: KAFKA-1076 URL: https://issues.apache.org/jira/browse/KAFKA-1076 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1076.patch -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Closed] (KAFKA-1076) system tests in 0.8 are broken due to wrong log4j config
[ https://issues.apache.org/jira/browse/KAFKA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-1076. -- system tests in 0.8 are broken due to wrong log4j config Key: KAFKA-1076 URL: https://issues.apache.org/jira/browse/KAFKA-1076 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1076.patch -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1073) CheckReassignmentStatus is broken
[ https://issues.apache.org/jira/browse/KAFKA-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1073: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the review. Committed to 0.8. CheckReassignmentStatus is broken - Key: KAFKA-1073 URL: https://issues.apache.org/jira/browse/KAFKA-1073 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1073_2013-10-05_11:02:45.patch, KAFKA-1073.patch CheckReassignmentStatus is supposed to take the output from ReassignPartitionsCommand as the input. However, the output from ReassignPartitionsCommand is not a valid json. It's also not clear how to prepare the input to CheckReassignmentStatus. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Closed] (KAFKA-1073) CheckReassignmentStatus is broken
[ https://issues.apache.org/jira/browse/KAFKA-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-1073. -- CheckReassignmentStatus is broken - Key: KAFKA-1073 URL: https://issues.apache.org/jira/browse/KAFKA-1073 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: KAFKA-1073_2013-10-05_11:02:45.patch, KAFKA-1073.patch CheckReassignmentStatus is supposed to take the output from ReassignPartitionsCommand as the input. However, the output from ReassignPartitionsCommand is not a valid json. It's also not clear how to prepare the input to CheckReassignmentStatus. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Closed] (KAFKA-1075) Consumer will not rebalance upon topic partition change
[ https://issues.apache.org/jira/browse/KAFKA-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-1075. -- Consumer will not rebalance upon topic partition change --- Key: KAFKA-1075 URL: https://issues.apache.org/jira/browse/KAFKA-1075 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8 Attachments: KAFKA-1075_2013-10-07_10:25:01.patch, KAFKA-1075_2013-10-07_10:58:40.patch, KAFKA-1075.patch Due to the watcher and zk data structure mismatch, consumer will not rebalance upon topic partition change. Details: ZK data structure for topic partitions: /brokers/topics/[topic-name]/partitions/[partition-id]/state When new partitions are added, it will change the data in /brokers/topics/[topic-name], however the ZK client is watching on child change of /brokers/topics/[topic-name], which will always be a single child 'partitions'. Proposal: add a data change listener, which will trigger rebalance upon data changes on /brokers/topics/[topic-name]. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Resolved] (KAFKA-1075) Consumer will not rebalance upon topic partition change
[ https://issues.apache.org/jira/browse/KAFKA-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1075. Resolution: Fixed Fix Version/s: 0.8 Thanks for the patch. +1 and committed to 0.8. Consumer will not rebalance upon topic partition change --- Key: KAFKA-1075 URL: https://issues.apache.org/jira/browse/KAFKA-1075 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8 Attachments: KAFKA-1075_2013-10-07_10:25:01.patch, KAFKA-1075_2013-10-07_10:58:40.patch, KAFKA-1075.patch Due to the watcher and zk data structure mismatch, consumer will not rebalance upon topic partition change. Details: ZK data structure for topic partitions: /brokers/topics/[topic-name]/partitions/[partition-id]/state When new partitions are added, it will change the data in /brokers/topics/[topic-name], however the ZK client is watching on child change of /brokers/topics/[topic-name], which will always be a single child 'partitions'. Proposal: add a data change listener, which will trigger rebalance upon data changes on /brokers/topics/[topic-name]. -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1067) the default partitioner should be randomizing messages and a new partition for the meta refresh requirements created
[ https://issues.apache.org/jira/browse/KAFKA-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1067: --- Fix Version/s: (was: 0.8) 0.8.1 the default partitioner should be randomizing messages and a new partition for the meta refresh requirements created Key: KAFKA-1067 URL: https://issues.apache.org/jira/browse/KAFKA-1067 Project: Kafka Issue Type: Bug Reporter: Joe Stein Fix For: 0.8.1 -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1063) run log cleanup at startup
[ https://issues.apache.org/jira/browse/KAFKA-1063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1063: --- Fix Version/s: (was: 0.8) 0.8.1 Moving to 0.8.1 since it's a minor issue and there is a manual workaround. run log cleanup at startup -- Key: KAFKA-1063 URL: https://issues.apache.org/jira/browse/KAFKA-1063 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: paul mackles Priority: Minor Fix For: 0.8.1 Jun suggested I file this ticket to have the brokers start running cleanup at start. Here is the scenario that precipitated it: We ran into a situation on our dev cluster (3 nodes, v0.8) where we ran out of disk on one of the nodes . As expected, the broker shut itself down and all of the clients switched over to the other nodes. So far so good. To free up disk space, I reduced log.retention.hours to something more manageable (from 172 to 12). I did this on all 3 nodes. Since the other 2 nodes were running OK, I first tried to restart the node which ran out of disk. Unfortunately, it kept shutting itself down due to the full disk. From the logs, I think this was because it was trying to sync-up the replicas it was responsible for and of course couldn't due to the lack of disk space. My hope was that upon restart, it would see the new retention settings and free up a bunch of disk space before trying to do any syncs. I then went and restarted the other 2 nodes. They both picked up the new retention settings and freed up a bunch of storage as a result. I then went back and tried to restart the 3rd node but to no avail. It still had problems with the full disks. I thought about trying to reassign partitions so that the node in question had less to manage but that turned out to be a hassle so I wound up manually deleting some of the old log/segment files. The broker seemed to come back fine after that but that's not something I would want to do on a production server. We obviously need better monitoring/alerting to avoid this situation altogether, but I am wondering if the order of operations at startup could/should be changed to better account for scenarios like this. Or maybe a utility to remove old logs after changing ttl? Did I miss a better way to handle this? Original email thread is here: http://mail-archives.apache.org/mod_mbox/kafka-users/201309.mbox/%3cce6365ae.82d66%25pmack...@adobe.com%3e -- This message was sent by Atlassian JIRA (v6.1#6144)
[jira] [Updated] (KAFKA-1036) Unable to rename replication offset checkpoint in windows
[ https://issues.apache.org/jira/browse/KAFKA-1036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1036: --- Affects Version/s: (was: 0.8) 0.8.1 Fix Version/s: (was: 0.8) 0.8.1 This seems to be only affecting trunk. So, moving to 0.8.1. Unable to rename replication offset checkpoint in windows - Key: KAFKA-1036 URL: https://issues.apache.org/jira/browse/KAFKA-1036 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Environment: windows Reporter: Timothy Chen Priority: Critical Labels: windows Fix For: 0.8.1 Although there was a fix for checkpoint file renaming in windows that tries to delete the existing checkpoint file if renamed failed, I'm still seeing renaming errors on windows even though the destination file doesn't exist. A bit investigation shows that it wasn't able to rename the file since the kafka jvm still holds a fie lock on the tmp file and wasn't able to rename it. Attaching a patch that calls a explict writer.close so it can release the lock and can able to rename it. -- This message was sent by Atlassian JIRA (v6.1#6144)