[jira] [Updated] (KAFKA-965) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk
[ https://issues.apache.org/jira/browse/KAFKA-965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-965: -- Attachment: kafka-965.patch Attach a patch. The following files have to be merged manually. # Unmerged paths: # (use git add/rm file... as appropriate to mark resolution) # # both modified: core/src/main/scala/kafka/admin/AdminUtils.scala # both modified: core/src/main/scala/kafka/api/RequestKeys.scala # both modified: core/src/main/scala/kafka/log/Log.scala # both modified: core/src/main/scala/kafka/log/LogSegment.scala # deleted by us: core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala # both modified: core/src/main/scala/kafka/server/KafkaApis.scala # both modified: core/src/main/scala/kafka/server/KafkaConfig.scala # both modified: core/src/main/scala/kafka/server/KafkaServer.scala # both modified: core/src/main/scala/kafka/server/ReplicaManager.scala # both modified: core/src/main/scala/kafka/utils/ZkUtils.scala # both modified: core/src/test/scala/unit/kafka/admin/AdminTest.scala # both modified: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala # both modified: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala # both modified: core/src/test/scala/unit/kafka/producer/ProducerTest.scala # both modified: core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala # both modified: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala # both modified: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala # # Changed but not updated: # (use git add file... to update what will be committed) # (use git checkout -- file... to discard changes in working directory) # # modified: core/src/main/scala/kafka/server/OffsetCheckpoint.scala merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk Key: KAFKA-965 URL: https://issues.apache.org/jira/browse/KAFKA-965 Project: Kafka Issue Type: Task Components: core Affects Versions: 0.8.1 Reporter: Jun Rao Assignee: Jun Rao Attachments: kafka-965.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-965) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk
[ https://issues.apache.org/jira/browse/KAFKA-965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-965: -- Status: Patch Available (was: In Progress) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk Key: KAFKA-965 URL: https://issues.apache.org/jira/browse/KAFKA-965 Project: Kafka Issue Type: Task Components: core Affects Versions: 0.8.1 Reporter: Jun Rao Assignee: Jun Rao Attachments: kafka-965.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: beta2 = Maven Repository Signed and Staged
I agree it is very very low touch since it is only a change when building for publishing and does not affect the jar. I don't have objections and unless anyone else does have objections then I will publish and see maybe we get more errors/issues or 0.8.0-beta1 will be available from maven. I should be able to-do this later tonight or in the morning unless someone has an issue with proceeding. On Sun, Jul 7, 2013 at 5:40 PM, Jay Kreps jay.kr...@gmail.com wrote: That's awesome! Personally I think it is okay if there is a slight cosmetic difference as long as we fix it before the proper 0.8 release. -Jay On Sun, Jul 7, 2013 at 6:55 AM, Joe Stein crypt...@gmail.com wrote: So, I think we have everything now for a successful publishing to maven repo for 0.8.0-betaX Kafka build on Scala 2.8.0, 2.8.2, 2.9.1, 2.9.2 Since I had to make a slight code change https://issues.apache.org/jira/browse/KAFKA-963 I am not sure if it is proper to officially publish the artifacts since it was not what we voted upon technically speaking. I don't know for sure if when I hit close to the staging release to promote it to public release if any other errors will come up through the process. My thinking is that we should roll a beta2 release and give the publish to public release another go. Thoughts? /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop */ -- /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop */
[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message
[ https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13703755#comment-13703755 ] Joel Koshy commented on KAFKA-966: -- One way to accomplish this is to turn off autocommit and checkpoint offsets only after a message (or batch of messages) have been written to the DB. One caveat though is that rebalances (e.g., if a new consumer instance shows up) will result in offsets being committed so there would be an issue if the DB is unavailable and a rebalance occurs simultaneously and there are unprocessed messages that have already been pulled out of the iterator. Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message - Key: KAFKA-966 URL: https://issues.apache.org/jira/browse/KAFKA-966 Project: Kafka Issue Type: Improvement Components: consumer Affects Versions: 0.8 Reporter: Chris Curtin Assignee: Neha Narkhede Priority: Minor Enhancement request. The high level consumer is very close to handling a lot of situations a 'typical' client would need. Except for when the message received from Kafka is valid, but the business logic that wants to consume it has a problem. For example if I want to write the value to a MongoDB or Cassandra database and the database is not available. I won't know until I go to do the write that the database isn't available, but by then it is too late to NOT read the message from Kafka. Thus if I call shutdown() to stop reading, that message is lost since the offset Kafka writes to ZooKeeper is the next offset. Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the next offset to read for this partition to this message when I start up again. And if there are any messages in the BlockingQueue for other partitions, find the lowest # and use it for that partitions offset since I haven't consumed them yet. Thus I can cleanly shutdown my processing, resolve whatever the issue is and restart the process. Another idea might be to allow a 'peek' into the next message and if I succeed in writing to the database call 'next' to remove it from the queue. I understand this won't deal with a 'kill -9' or hard failure of the JVM leading to the latest offsets not being written to ZooKeeper but it addresses a likely common scenario for consumers. Nor will it add true transactional support since the ZK update could fail. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed
[ https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Fung updated KAFKA-915: Status: Open (was: Patch Available) System Test - Mirror Maker testcase_5001 failed --- Key: KAFKA-915 URL: https://issues.apache.org/jira/browse/KAFKA-915 Project: Kafka Issue Type: Bug Reporter: John Fung Assignee: Joel Koshy Priority: Critical Labels: kafka-0.8, replication-testing Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz This case passes if brokers are set to partition = 1, replicas = 1 It fails if brokers are set to partition = 5, replicas = 3 (consistently reproducible) This test case is set up as shown below. 1. Start 2 ZK as a cluster in Source 2. Start 2 ZK as a cluster in Target 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1) 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1) 5. Start 1 MM 6. Start ProducerPerformance to send some data 7. After Producer is done, start ConsoleConsumer to consume data 8. Stop all processes and validate if there is any data loss. 9. No failure is introduced to any process in this test Attached a tar file which contains the logs and system test output for both cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed
[ https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13703827#comment-13703827 ] John Fung commented on KAFKA-915: - Hi Joel, After apply kafka-915-v1.patch (which is to create topic manually before starting mirror maker), testcase_5001 passes. However, testcase_5003 testcase_5005 are failing due to data loss. Thanks, John System Test - Mirror Maker testcase_5001 failed --- Key: KAFKA-915 URL: https://issues.apache.org/jira/browse/KAFKA-915 Project: Kafka Issue Type: Bug Reporter: John Fung Assignee: Joel Koshy Priority: Critical Labels: kafka-0.8, replication-testing Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz This case passes if brokers are set to partition = 1, replicas = 1 It fails if brokers are set to partition = 5, replicas = 3 (consistently reproducible) This test case is set up as shown below. 1. Start 2 ZK as a cluster in Source 2. Start 2 ZK as a cluster in Target 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1) 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1) 5. Start 1 MM 6. Start ProducerPerformance to send some data 7. After Producer is done, start ConsoleConsumer to consume data 8. Stop all processes and validate if there is any data loss. 9. No failure is introduced to any process in this test Attached a tar file which contains the logs and system test output for both cases. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: having problem with 0.8 gzip compression
Another piece of information, the snappy compression also does not work. Thanks, Scott On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang scott.w...@rumbleentertainment.com wrote: I just try it and it still not showing up, thanks for looking into this. Thanks, Scott On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote: Could you try starting the consumer first (and enable gzip in the producer)? Thanks, Jun On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: No, I did not start the consumer before the producer. I actually started the producer first and nothing showed up in the consumer unless I commented out this line -- props.put(compression.codec, gzip).If I commented out the compression codec, everything just works. On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote: Did you start the consumer before the producer? Be default, the consumer gets only the new data? Thanks, Jun On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: I am testing with Kafka 0.8 beta and having problem of receiving message in consumer. There is no error so does anyone have any insights. When I commented out the compression.code everything works fine. My producer: public class TestKafka08Prod { public static void main(String [] args) { ProducerInteger, String producer = null; try { Properties props = new Properties(); props.put(metadata.broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(producer.type, sync); props.put(request.required.acks,1); props.put(compression.codec, gzip); ProducerConfig config = new ProducerConfig(props); producer = new ProducerInteger, String(config); int j=0; for(int i=0; i10; i++) { KeyedMessageInteger, String data = new KeyedMessageInteger, String(test-topic, test-message: +i+ +System.currentTimeMillis()); producer.send(data); } } catch (Exception e) { System.out.println(Error happened: ); e.printStackTrace(); } finally { if(null != null) { producer.close(); } System.out.println(Ened of Sending); } System.exit(0); } } My consumer: public class TestKafka08Consumer { public static void main(String [] args) throws UnknownHostException, SocketException { Properties props = new Properties(); props.put(zookeeper.connect, localhost:2181/kafka_0_8); props.put(group.id, test08ConsumerId); props.put(zk.sessiontimeout.ms, 4000); props.put(zk.synctime.ms, 2000); props.put(autocommit.interval.ms, 1000); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); String topic = test-topic; MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(1)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStreambyte[], byte[] stream = consumerMap.get(topic).get(0); ConsumerIteratorbyte[], byte[] it = stream.iterator(); int counter=0; while(it.hasNext()) { try { String fromPlatform = new String(it.next().message()); System.out.println(The messages: +fromPlatform); } catch(Exception e) { e.printStackTrace(); } } System.out.println(SystemOut); } } Thanks
[jira] [Created] (KAFKA-967) Use key range in ProducerPerformance
Guozhang Wang created KAFKA-967: --- Summary: Use key range in ProducerPerformance Key: KAFKA-967 URL: https://issues.apache.org/jira/browse/KAFKA-967 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Currently in ProducerPerformance, the key of the message is set to MessageID. It would better to set it to a specific key within a key range (Integer type) so that we can test the semantic partitioning case. This is related to KAFKA-957. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-967) Use key range in ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-967: Attachment: KAFKA-967.v1.patch 1. Add messageKeyRange to ProducerPerfConfig 2. In generateProducerData, select the key in a round robin manner within the key range. 3. Set the key in generateMessageWithSeqId and add the key value to the payload. Use key range in ProducerPerformance Key: KAFKA-967 URL: https://issues.apache.org/jira/browse/KAFKA-967 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-967.v1.patch Currently in ProducerPerformance, the key of the message is set to MessageID. It would better to set it to a specific key within a key range (Integer type) so that we can test the semantic partitioning case. This is related to KAFKA-957. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Issue Comment Deleted] (KAFKA-967) Use key range in ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-967: Comment: was deleted (was: 1. Add messageKeyRange to ProducerPerfConfig 2. In generateProducerData, select the key in a round robin manner within the key range. 3. Set the key in generateMessageWithSeqId and add the key value to the payload.) Use key range in ProducerPerformance Key: KAFKA-967 URL: https://issues.apache.org/jira/browse/KAFKA-967 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-967.v1.patch Currently in ProducerPerformance, the key of the message is set to MessageID. It would better to set it to a specific key within a key range (Integer type) so that we can test the semantic partitioning case. This is related to KAFKA-957. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-967) Use key range in ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-967: Status: Patch Available (was: Open) 1. Add messageKeyRange to ProducerPerfConfig 2. In generateProducerData, select the key in a round robin manner within the key range. 3. Set the key in generateMessageWithSeqId and add the key value to the payload. Use key range in ProducerPerformance Key: KAFKA-967 URL: https://issues.apache.org/jira/browse/KAFKA-967 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-967.v1.patch Currently in ProducerPerformance, the key of the message is set to MessageID. It would better to set it to a specific key within a key range (Integer type) so that we can test the semantic partitioning case. This is related to KAFKA-957. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira