Re: [DISCUSS] KIP-1008: ParKa - the Marriage of Parquet and Kafka
> if we can produce the segment with Parquet, which is the native format in a data lake, the consumer application (e.g., Spark jobs for ingestion) can directly dump the segments as raw byte buffer into the data lake without unwrapping each record individually and then writing to the Parquet file one by one with expensive steps of encoding and compression again. This sounds like an interesting idea. I have one concern though. Data Lake/table formats (like Delta Lake, Hudi, Iceberg) have column-level statistics, which are important for query performance. How would column stats be handled in this proposal? On Tue, Nov 21, 2023 at 9:21 AM Xinli shang wrote: > Hi, all > > Can I ask for a discussion on the KIP just created KIP-1008: ParKa - the > Marriage of Parquet and Kafka > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1008%3A+ParKa+-+the+Marriage+of+Parquet+and+Kafka > > > ? > > -- > Xinli Shang >
Re: Review Request 33760: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/ --- (Updated May 1, 2015, 10:42 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. add a test for checking Serializer is closed during KafkaProducer#close missing copyright header in previous checkin remvoed throws Exception for test methods Diffs (updated) - clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 9a57579f87cb19cb6affe6d157ff8446c23e3551 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION Diff: https://reviews.apache.org/r/33760/diff/ Testing --- Thanks, Steven Wu
Review Request 33760: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33760/ --- Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. add a test for checking Serializer is closed during KafkaProducer#close Diffs - clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 9a57579f87cb19cb6affe6d157ff8446c23e3551 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION Diff: https://reviews.apache.org/r/33760/diff/ Testing --- Thanks, Steven Wu
Re: [DISCUSSION] java.io.Closeable in KAFKA-2121
On Tue, Apr 28, 2015 at 1:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Good point Jay. More specifically we were already implementing without the checked exception, we'd need to override close() in the Serializer and Deserializer interfaces and omit the throws clause. That definitely makes them source compatible. Not sure about binary compatibility, I couldn't find a quick answer but I think it's probably still compatible. -Ewen On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, You can implement Closable without the checked exception. Having close() methods throw checked exceptions isn't very useful unless there is a way for the caller to recover. In this case there really isn't, right? -Jay On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang wangg...@gmail.com wrote: Folks, In a recent commit I made regarding KAFKA-2121, there is an omitted API change which makes Serializer / Deserializer extending from Closeable, whose close() call could throw IOException by declaration. Hence now some scenario like: - SerializerT keySerializer = ... SerializerT valueSerializer = ... KafkaProducer producer = new KafkaProducer(config, keySerializer, valueSerializer) // ... keySerializer.close() valueSerializer.close() - will need to capture IOException now. Want to bring this up for people's attention, and you opinion on whether we should revert this change? -- Guozhang -- Thanks, Ewen
Re: [DISCUSSION] java.io.Closeable in KAFKA-2121
sorry for the previous empty msg. Jay's idea should work. basically, we override the close method in Serializer interface. public interface SerializerT extends Closeable { @Override public void close(); } On Tue, Apr 28, 2015 at 1:10 PM, Steven Wu stevenz...@gmail.com wrote: On Tue, Apr 28, 2015 at 1:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Good point Jay. More specifically we were already implementing without the checked exception, we'd need to override close() in the Serializer and Deserializer interfaces and omit the throws clause. That definitely makes them source compatible. Not sure about binary compatibility, I couldn't find a quick answer but I think it's probably still compatible. -Ewen On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, You can implement Closable without the checked exception. Having close() methods throw checked exceptions isn't very useful unless there is a way for the caller to recover. In this case there really isn't, right? -Jay On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang wangg...@gmail.com wrote: Folks, In a recent commit I made regarding KAFKA-2121, there is an omitted API change which makes Serializer / Deserializer extending from Closeable, whose close() call could throw IOException by declaration. Hence now some scenario like: - SerializerT keySerializer = ... SerializerT valueSerializer = ... KafkaProducer producer = new KafkaProducer(config, keySerializer, valueSerializer) // ... keySerializer.close() valueSerializer.close() - will need to capture IOException now. Want to bring this up for people's attention, and you opinion on whether we should revert this change? -- Guozhang -- Thanks, Ewen
Review Request 33654: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33654/ --- Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- override java.io.Closeable$close method in Serializer and Deserializer interfaces without throwing checked IOException. this is to avoid breaking the source compatability. Diffs - clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 9a57579f87cb19cb6affe6d157ff8446c23e3551 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 Diff: https://reviews.apache.org/r/33654/diff/ Testing --- Thanks, Steven Wu
Review Request 33574: Patch for KAFKA-2151
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33574/ --- Review request for kafka. Bugs: KAFKA-2151 https://issues.apache.org/jira/browse/KAFKA-2151 Repository: kafka Description --- make MockMetricsReporter a little more generic Diffs - clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java eea2c28450736d1668c68828f77a49470a82c3d0 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 49f1427bcbe43c773920a25aa69a71d0329296b7 clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 6f948f240c906029a0f972bf770f288f390ea714 Diff: https://reviews.apache.org/r/33574/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 21, 2015, 5:48 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- move MockMetricsReporter into clients/src/test/java/org/apache/kafka/test Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
On April 21, 2015, 3:08 a.m., Guozhang Wang wrote: LGTM, besides one minor suggestion: could you move MockMetricsReporter to clients/src/test/java/org/apache/kafka/test? done. moved MockMetricsReporter to clients/src/test/java/org/apache/kafka/test - Steven --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80892 --- On April 21, 2015, 5:48 a.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 21, 2015, 5:48 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- move MockMetricsReporter into clients/src/test/java/org/apache/kafka/test Diffs - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 4:51 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 4:57 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 4:52 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 13be6a38cb356d55e25151776328a3c38c573db4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 3:08 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- applied same fix to KafkaConsumer Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
On April 19, 2015, 11:11 p.m., Ewen Cheslack-Postava wrote: LGTM! If this gets merged as is, we should file a follow-up issue for the new consumer, which has the same issue. OK. I applied the same fix for new consumer. also updated jira title to reflect the expanded scope. - Steven --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80642 --- On April 20, 2015, 3:30 a.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 3:30 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- applied same fix to KafkaConsumer add test for KafkaConsumer Diffs - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 20, 2015, 3:30 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- applied same fix to KafkaConsumer add test for KafkaConsumer Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 96ac6d0cca990eebe90707465d7d8091c069a4b2 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 21243345311a106f0802ce96c026ba6e815ccf99 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 19, 2015, 3:09 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 548 https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line548 One idea for making this less verbose and redundant: make all of these classes implement Closeable so we can just write one utility method for trying to close something and catching the exception. Steven Wu wrote: yes. I thought about it. it may break binary compatibility, e.g. Serializer. Sender and Metrics classes are probably only used internally. let me know your thoughts. Ewen Cheslack-Postava wrote: I'm pretty sure it's fine, based on this Changing the direct superclass or the set of direct superinterfaces of a class type will not break compatibility with pre-existing binaries, provided that the total set of superclasses or superinterfaces, respectively, of the class type loses no members. from https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html ok. you could be correct. here is another reference (easier to understand than the jls doc) Expand superinterface set (direct or inherited)- Binary compatible from https://wiki.eclipse.org/Evolving_Java-based_APIs_2#Evolving_API_Interfaces - Steven --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80346 --- On April 19, 2015, 3:09 a.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 19, 2015, 3:09 a.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- fix potential resource leak when KafkaProducer contructor failed in the middle Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b3d3d7c56acb445be16a3fbe00f05eaba659be46 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 4:55 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- add a unit test file changes based on Ewen's review feedbacks Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:03 p.m.) Review request for kafka. Changes --- address Ewen's review feedbacks I'm getting a bunch of checkstyle complaints when I try to test. These should all be easy to fix (and should be causing tests to fail before even running). The only rule that might not be obvious from the error message is that the static final field in MockMetricsReporter is expected to be all-caps since it looks like a constant to checkstyle. fixed In the constructor, could we throw some subclass of KafkaException instead? The new clients try to stick to that exception hierarchy except in a few special cases. Alternatively, maybe if we caught Error and RuntimeException instead of Throwable then we could just rethrow the same exception? I changed RuntimeException to KafkaException. can't think of a good subclass name for this scenario. ProducerConstructException? hence, stay with the generic KafkaException The new version of close() will swallow exceptions when called normally (i.e. not from the constructor). They'll be logged, but the caller won't see the exception anymore. Maybe we should save the first exception and rethrow it? refactored a private close(boolean swallowException) method Exception messages should be capitalized. fixed In the test, we should probably have an assert outside the catch. And is there any reason the closeCount is being reset to 0? yes. we should have an assert outside the catch I was just reset the CLOSE_COUNT in case another test method need to check the count. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- fix potential resource leak when KafkaProducer throws exception in the middle Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:44 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description (updated) --- add a unit test file changes based on Ewen's review feedbacks fix capitalization in error log Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 531 https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line531 This code is all single threaded, is the AtomicReference really necessary here? not really necessary. just trying to use the compareAndSet. otherwise, I need to do if(firstException == null) firstException = t. I can certainly change it. let me know. On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 548 https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line548 One idea for making this less verbose and redundant: make all of these classes implement Closeable so we can just write one utility method for trying to close something and catching the exception. yes. I thought about it. it may break binary compatibility, e.g. Serializer. Sender and Metrics classes are probably only used internally. let me know your thoughts. - Steven --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80346 --- On April 16, 2015, 5:44 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:44 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- add a unit test file changes based on Ewen's review feedbacks fix capitalization in error log Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: Review Request 33242: Patch for KAFKA-2121
On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote: Looks good, left a few comments. KafkaConsumer suffers from this same problem. Patching that should be pretty much identical -- any chance you could extend this to cover that as well? sure. I can extend this to KafkaConsumer later. - Steven --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/#review80346 --- On April 16, 2015, 5:44 p.m., Steven Wu wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- (Updated April 16, 2015, 5:44 p.m.) Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- add a unit test file changes based on Ewen's review feedbacks fix capitalization in error log Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Review Request 33242: Patch for KAFKA-2121
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33242/ --- Review request for kafka. Bugs: KAFKA-2121 https://issues.apache.org/jira/browse/KAFKA-2121 Repository: kafka Description --- add a unit test file Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java PRE-CREATION Diff: https://reviews.apache.org/r/33242/diff/ Testing --- Thanks, Steven Wu
Re: [DISCUSS] error handling in java KafkaProducer
Thanks, Ewen and Guozhang! I will go with the try-catch option then. here is the jira. feel free to assign it to me. I will try to submit a patch this week. https://issues.apache.org/jira/browse/KAFKA-2121 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven -- Thanks, Ewen -- -- Guozhang
Re: [DISCUSS] error handling in java KafkaProducer
I submitted a patch attempt in the jira. On Tue, Apr 14, 2015 at 10:16 AM, Steven Wu stevenz...@gmail.com wrote: Thanks, Ewen and Guozhang! I will go with the try-catch option then. here is the jira. feel free to assign it to me. I will try to submit a patch this week. https://issues.apache.org/jira/browse/KAFKA-2121 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote: It is a valid problem and we should correct it as soon as possible, I'm with Ewen regarding the solution. On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven, Looks like there is even more that could potentially be leaked -- since key and value serializers are created and configured at the end, even the IO thread allocated by the producer could leak. Given that, I think 1 isn't a great option since, as you said, it doesn't really address the underlying issue. 3 strikes me as bad from a user experience perspective. It's true we might want to introduce additional constructors to make testing easier, but the more components I need to allocate myself and inject into the producer's constructor, the worse the default experience is. And since you would have to inject the dependencies to get correct, non-leaking behavior, it will always be more code than previously (and a backwards incompatible change). Additionally, the code creating a the producer would have be more complicated since it would have to deal with the cleanup carefully whereas it previously just had to deal with the exception. Besides, for testing specifically, you can avoid exposing more constructors just for testing by using something like PowerMock that let you mock private methods. That requires a bit of code reorganization, but doesn't affect the public interface at all. So my take is that a variant of 2 is probably best. I'd probably do two things. First, make close() safe to call even if some fields haven't been initialized, which presumably just means checking for null fields. (You might also want to figure out if all the methods close() calls are idempotent and decide whether some fields should be marked non-final and cleared to null when close() is called). Second, add the try/catch as you suggested, but just use close(). -Ewen On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote: Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven -- Thanks, Ewen -- -- Guozhang
[DISCUSS] error handling in java KafkaProducer
Here is the resource leak problem that we have encountered when 0.8.2 java KafkaProducer failed in constructor. here is the code snippet of KafkaProducer to illustrate the problem. --- public KafkaProducer(ProducerConfig config, SerializerK keySerializer, SerializerV valueSerializer) { // create metrcis reporter via reflection ListMetricsReporter reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); // validate bootstrap servers ListInetSocketAddress addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } --- let's say MyMetricsReporter creates a thread in constructor. if hostname validation threw an exception, constructor won't call the close method of MyMetricsReporter to clean up the resource. as a result, we created thread leak issue. this becomes worse when we try to auto recovery (i.e. keep creating KafkaProducer again - failing again - more thread leaks). there are multiple options of fixing this. 1) just move the hostname validation to the beginning. but this is only fix one symtom. it didn't fix the fundamental problem. what if some other lines throw an exception. 2) use try-catch. in the catch section, try to call close methods for any non-null objects constructed so far. 3) explicitly declare the dependency in the constructor. this way, when KafkaProducer threw an exception, I can call close method of metrics reporters for releasing resources. KafkaProducer(..., ListMetricsReporter reporters) we don't have to dependency injection framework. but generally hiding dependency is a bad coding practice. it is also hard to plug in mocks for dependencies. this is probably the most intrusive change. I am willing to submit a patch. but like to hear your opinions on how we should fix the issue. Thanks, Steven
Re: Metrics package discussion
My main concern is that we don't do the migration in 0.8.3, we will be left with some metrics in YM format and some others in KM format (as we start sharing client code on the broker). This is probably a worse situation to be in. +1. I am not sure how our servo adaptor will work if there are two formats for metrics? unless there is an easy way to check the format (YM/KM). On Tue, Mar 31, 2015 at 9:40 AM, Jun Rao j...@confluent.io wrote: (2) The metrics are clearly part of the client API and we are not changing that (at least for the new client). Arguably, the metrics are also part of the broker side API. However, since they affect fewer parties (mostly just the Kafka admins), it may be easier to make those changes. My main concern is that we don't do the migration in 0.8.3, we will be left with some metrics in YM format and some others in KM format (as we start sharing client code on the broker). This is probably a worse situation to be in. Thanks, Jun On Tue, Mar 31, 2015 at 9:26 AM, Gwen Shapira gshap...@cloudera.com wrote: (2) I believe we agreed that our metrics are a public API. I believe we also agree we don't break API in minor releases. So, it seems obvious to me that we can't make breaking changes to metrics in minor releases. I'm not convinced we did it in the past is a good reason to do it again. Is there a strong reason to do it in a 0.8.3 time-frame? On Tue, Mar 31, 2015 at 7:59 AM, Jun Rao j...@confluent.io wrote: (2) Not sure why we can't do this in 0.8.3. We changed the metrics names in 0.8.2 already. Given that we need to share code btw the client and the core, and we need to keep the metrics consistent on the broker, it seems that we have no choice but to migrate to KM. If so, it seems that the sooner that we do this, the better. It is important to give people an easy path for migration. However, it may not be easy to keep the mbean names exactly the same. For example, YM has hardcoded attributes (e.g. 1-min-rate, 5-min-rate, 15-min-rate, etc for rates) that are not available in KM. One benefit out of this migration is that one can get the metrics in the client and the broker in the same way. Thanks, Jun On Mon, Mar 30, 2015 at 9:26 PM, Gwen Shapira gshap...@cloudera.com wrote: (1) It will be interesting to see what others use for monitoring integration, to see what is already covered with existing JMX integrations and what needs special support. (2) I think the migration story is more important - this is a non-compatible change, right? So we can't do it in 0.8.3 timeframe, it has to be in 0.9? And we need to figure out how will users migrate - do we just tell everyone please reconfigure all your monitors from scratch - don't worry, it is worth it? I know you keep saying we did it before and our users are used to it, but I think there are a lot more users now, and some of them have different compatibility expectations. We probably need to find: * A least painful way to migrate - can we keep the names of at least most of the metrics intact? * Good explanation of what users gain from this painful migration (i.e. more accurate statistics due to gazillion histograms) On Mon, Mar 30, 2015 at 6:29 PM, Jun Rao j...@confluent.io wrote: If we are committed to migrating the broker side metrics to KM for the next release, we will need to (1) have a story on supporting common reporters (as listed in KAFKA-1930), and (2) see if the current histogram support is good enough for measuring things like request time. Thanks, Jun On Mon, Mar 30, 2015 at 3:03 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: If we do plan to use the network code in client, I think that is a good reason in favor of migration. It will be unnecessary to have metrics from multiple libraries coexist since our users will have to start monitoring these new metrics anyway. I also agree with Jay that in multi-tenant clusters people care about detailed statistics for their own application over global numbers. Based on the arguments so far, I'm +1 for migrating to KM. Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Sunday, March 29, 2015 9:44 AM To: dev@kafka.apache.org Subject: Re: Metrics package discussion There is another thing to consider. We plan to reuse the client components on the server side over time. For example, as part of the security work, we are looking into replacing the server side network code with the client network code (KAFKA-1928). However, the client network already has metrics based on KM. Thanks, Jun On Sat, Mar 28, 2015 at 1:34 PM, Jay Kreps jay.kr...@gmail.com wrote: I
Re: [KIP-DISCUSSION] KIP-13 Quotas
separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while
Re: [KIP-DISCUSSION] KIP-13 Quotas
wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps
Re: [KIP-DISCUSSION] KIP-13 Quotas
please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to
Re: [KIP-DISCUSSION] KIP-13 Quotas
Ewen, I see your point regarding the shared buffer. yes, a bad/slow broker could potentially consume up all buffer. On the other hand, I do like the batching behavior of shared RecordAccumulator buffer. On Tue, Mar 17, 2015 at 8:25 AM, Guozhang Wang wangg...@gmail.com wrote: Ewen, 1. I think we are on the same page as per malicious clients, that it should not be the target of either approach. I was just trying to separate the discussion from what if user just keep retrying and maybe I was not clear. 2. I was not advocating option A on the wiki, in my previous email I actually assume that option is already dropped and we are only considering option B (which is my option b) in the email) and C (option a) in my email), and I think with some proper wrapping of status codes (today we still call them error codes) option B in the wiki may not necessarily require people who implement clients to handle each status code one-by-one. Guozhang On Tue, Mar 17, 2015 at 12:22 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven - that's a reasonable concern. I think I've mentioned the same sort of issue in the issues about the new producer's RecordAccumulator not timing out sends, e.g. in https://issues.apache.org/jira/browse/KAFKA-1788 . The shared buffer causes problems if one broker isn't available for awhile since messages to that broker end up consuming the entire buffer. You can end up with a similar problem here due to the effective rate limiting caused by delaying responses. Guozhang - I think only option A from the KIP is actually an error. If we want to look to HTTP for examples, there's an RFC that defines the Too Many Requests response to handle rate limiting: http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is an error, specifically a client error since its in the 400 range.The implication from the status code (429), name of the response, and the example given is that is is an error and no real data is returned, which would correspond to option A from the KIP. Note that the protocol provides a mechanism for giving extra (optional) information about when you should retry (via headers). I'd guess that even despite that, most systems that encounter a 429 use some ad hoc backoff mechanism because they only try to detect anything in the 400 range... One additional point -- I think malicious clients shouldn't be our target here, they can do a lot worse than what's been addressed in this thread. But I do agree that any proposal should have a clear explanation of how existing clients that are ignorant of quotas would behave (which is why options b and c make a lot of sense -- they rate limit without requiring an update to normally-behaving clients). On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros
Re: [KIP-DISCUSSION] KIP-13 Quotas
Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do
Re: New consumer client
Jay, we have observed CRC corruption too occasionally. I reported in an thread and asked how should we handle some error conditions from old high-level consumer. On Mon, Feb 9, 2015 at 11:36 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, 1) Sorry to get back to you so late. It is CRC check error on any consumer thread regardless of the server. What happens is I have to catch this exception is skip the message now. There is no option to re-fetch this message. Is there any way to add behavior in Java consumer to re-fetch this offset CRC failed offset. 2) Secondly, can you please add default behavior to auto set 'fetch.message.max.bytes' = broker's message.max.bytes. This will ensure smooth configuration for both simple and high level consumer. This will take burden away from Kafka user to config this property. We had lag issue due to this mis configuration and drop messages on Camus side and (camus has different setting for simple consumer). It would be great to auto config this if user did not supply this in configuration. Let me know if you agree with #2. Thanks, Bhavesh On Mon, Jan 12, 2015 at 9:25 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Bhavesh, This seems like a serious issue and not one anyone else has reported. I don't know what you mean by corrupt message, are you saying the CRC check fails? If so, that check is done both by the broker (prior to appending to the log) and the consumer so that implies either a bug in the broker or else disk corruption on the server. I do have an option to disable the CRC check in the consumer, though depending on the nature of the corruption that can just lead to more serious errors (depending on what is corrupted). -jay On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jay, One of the pain point of existing consumer code is CORRUPT_MESSAGE occasionally. Right now, it is hard to pin-point the problem of CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is there any proposal to auto skip corrupted message and have reporting visibility of CRC error(metics etc or traceability to find corruption).per topic etc ? I am not sure if this is correct email thread to address this if not please let me know. Will provide feedback about new consumer api and changes. Thanks, Bhavesh On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps j...@confluent.io wrote: I uploaded an updated version of the new consumer client ( https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost feature complete, and has pretty reasonable testing and metrics. I think it is ready for review and could be checked in once 0.8.2 is out. For those who haven't been following this is meant to be a new consumer client, like the new producer is 0.8.2, and intended to replace the existing high level and simple scala consumers. This still needs the server-side implementation of the partition assignment and group management to be fully functional. I have just stubbed this out in the server to allow the implementation and testing of the server but actual usage will require it. However the client that exists now is actually a fully functional replacement for the simple consumer that is vastly easier to use correctly as it internally does all the discovery and failover. It would be great if people could take a look at this code, and particularly at the public apis which have several small changes from the original proposal. Summary What's there: 1. Simple consumer functionality 2. Offset commit and fetch 3. Ability to change position with seek 4. Ability to commit all or just some offsets 5. Controller discovery, failure detection, heartbeat, and fail-over 6. Controller partition assignment 7. Logging 8. Metrics 9. Integration tests including tests that simulate random broker failures 10. Integration into the consumer performance test Limitations: 1. There could be some lingering bugs in the group management support, it is hard to fully test fully with just the stub support on the server, so we'll need to get the server working to do better I think. 2. I haven't implemented wild-card subscriptions yet. 3. No integration with console consumer yet Performance I did some performance comparison with the old consumer over localhost on my laptop. Usually localhost isn't good for testing but in this case it is good because it has near infinite bandwidth so it does a good job at catching inefficiencies that would be hidden with a slower network. These numbers probably aren't representative of what you would get over a real network, but help bring out the relative efficiencies. Here
Re: [VOTE] 0.8.2.0 Candidate 3
In Netflix, we have been using route53 DNS name as bootstrap servers in AWS env. Basically, when a kafka broker start, we add it to route53 DNS name for the cluster. this is like the VIP that Jay suggested. But we are also moving toward to use Eureka service registry for bootstrapping. We are worried that if DNS name happens to resolve to a bad broker. it might impact the bootstrap process/resiliency. We want to get a list of brokers from Eureka to pass in as bootstrap.servers. On Sun, Feb 1, 2015 at 5:30 AM, Jay Kreps jay.kr...@gmail.com wrote: You may already know this but the producer doesn't require a complete list of brokers in its config, it just requires the connection info for one active broker which it uses to discover the rest of the brokers. We allow you to specify multiple urls here for failover in cases where you aren't using a vip. So if you can put three brokers into the VIP for metadata bootstrapping you can still scale up and down the rest of the cluster. -Jay On Sun, Feb 1, 2015 at 12:17 AM, Alex The Rocker alex.m3...@gmail.com wrote: Jun: You raise a very good question: let me explain why we use Broker.getConnectionString(), so may be we'll get a supported way to answer our need. We use Broker.getConnectionString() because we deploy Kafka services in Amazon EC2 with the following architecture: * Three VMs dedicated to Zookeeper processes * At least two VMs with Kafka broker, but depending on load it can be scaled to more broker VMs. Brokers self-register their address in Zookeeper by serializing Broker objects in Zk. The VMs with Zookeeper have Elastic IPs = stable public IPs, These public IPs are fed to the various Application services which rely on Kafka to stream their logs monitoring data to our central Hadoop system. Using zkclient and the above mentionned public zookeeper IPs, we get the list of brokers registrered to a given Kafka service: this is where we unserializer Broker objects and then use getConnectionString() to discover the brokers' addresses. Then, brokers addresses are used to initialize the Kafka producer(s). The whole trick is that we cannot use Elastic IP (=stable IPs) for Kafka VMs, because of their 'elastic nature : we want to be able to scale up / down the number of VMs with Kafka brokers. Now, we understand that using non public Kafka API is bad : we've been broken when moving to 0.8.1.1, then again when moving to 0.8.2.0... So it's time to raise the right question: what would be the supported way to configure our producers given our dynamic-IP-for-brokers context? Thanks, Alex. 2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre alexandre.vermeerber...@3ds.com: -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Sunday, February 01, 2015 3:03 To: us...@kafka.apache.org; kafka-clie...@googlegroups.com Cc: dev@kafka.apache.org Subject: Re: [VOTE] 0.8.2.0 Candidate 3 Hi, Alex, Thanks for testing RC3. Broker.connectionString() is actually not part of the public api for the producer. Is there a particular reason that you need to use this api? Thanks, Jun On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker alex.m3...@gmail.com wrote: Hello, I have read Broker.scala source code, and I found the answer: - With Kafka 0.8.1.1 we used Broker.getConnectionString() in our Java code. - With Kafka 0.8.2.0, this method has been replaced by a 0-arity method without the get prefix, so we have to change our Java code to call Broker.connectionString() So despite binary compatibility is broken, we have a by-pass. I hope this will help other people relying on this API... and I'm going to continue tests with 0.8.2 rc3.. Alex 2015-01-31 21:23 GMT+01:00 Alex The Rocker alex.m3...@gmail.com: Hello, I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with our application: 1st test: == replace all kafka .jar files in our application on consumming side (without recompiling anything) = tests passed, OK 2nd test: === replace all kafka .jar files in our application on producubg side (without recompiling anything) = KO, we get this error: 2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect - Exception in thread Timer-2 2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect - java.lang.NoSuchMethodError: kafka.cluster.Broker.getConnectionString()Ljava/lang/String; Which means that binary compatibility with 0.8.1.1 version has been broken. We use getConnectionString() to get Broker's zookeepers adresses, see this answer from Neha: http://mail-archives.apache.org/mod_mbox/kafka-users/201404.mbox/%3CCA
[DISCUSSION] generate explicit error/failing metrics
To illustrate my point, I will use allTopicsOwnedPartitionsCount guage from ZookeeperConsumerConnector as an example. It captures number of partitions for a topic that has been assigned owner for the consumer group. let's say that I have a topic with 9 partitions. this metrics should normally report value 9. I can setup alert if allTopicsOwnedPartitionsCount 9. here are the drawbacks of this kind of metric. 1) if our metrics report/aggregation system has data loss and cause the value reported as zero, we can't really distinguish whether it's an real error or it is data loss. so we can get false positive/alarm from data loss 2) if we change the number of partitions (e.g. from 9 to 18). we need to remember to change the alert rule to allTopicsOwnedPartitionsCount 18. this kind of coupling is a maintenance nightmare. A more explicit metric is NoOwnerPartitionsCount. it should be zero normally. if it is not zero, we should be alerted. this way, we won't get false alarm from data loss. We don't have to change/fix this particular example since a new consumer is being worked on. But in new consumer please consider more explicit error signals. Thanks, Steven
Re: Review Request 30158: Patch for KAFKA-1835
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30158/#review69515 --- clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/30158/#comment114212 I would - for-loop to call Metadata.add(topic). this way we add all topics to Metadata - call Metadata#requestUpdate() to tigger Sender thread to request update for all listed topics clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/30158/#comment114215 I would not use initialized flag, as long as we fix KafkaProducer#waitOnMetadata to allow value 0 for non-blocking. - Steven Wu On Jan. 22, 2015, 7:04 a.m., Paul Pearcy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30158/ --- (Updated Jan. 22, 2015, 7:04 a.m.) Review request for kafka. Bugs: KAFKA-1835 https://issues.apache.org/jira/browse/KAFKA-1835 Repository: kafka Description --- KAFKA-1835 - New producer updates to make blocking behavior explicit Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java fc71710dd5997576d3841a1c3b0f7e19a8c9698e clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 core/src/test/scala/integration/kafka/api/ProducerBlockingTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala ac15d34425795d5be20c51b01fa1108bdcd66583 Diff: https://reviews.apache.org/r/30158/diff/ Testing --- Thanks, Paul Pearcy
two questions regarding 0.8.2 producer code
I am checking out the source code of 0.8.2 producer code. I have two questions and hope to get some clarifications. 1) Sender thread/Runnable has this run loop. what if the in-memory queue is mostly empty (i.e. producer has very few msgs to send out)? will this become a simple tight loop that just wastes cpu? // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } 2) Selector#poll() [line #200] if (transmissions.hasSend()) throw new IllegalStateException(Attempt to begin a send operation with prior send operation still in progress.); 2.1) since it's aysnc API, back-to-back sends to the same broker seem very possible to me. should we throw an exception here? 2.2) if this happens, it seems that we will silently drop the msgs without executing callbacks? Thanks, Steven
Re: two questions regarding 0.8.2 producer code
Jay, thanks a lot for the quick response. For #2, I do see some isSendable() check in Sender.java and NetworkClient.java that is eventually mapped to InFlightRequests#canSendMore() check. it wasn't immediately clear to me how is that translated to transmissions.hasSend() check. anyway, I will dig a little more. On Mon, Nov 10, 2014 at 2:59 PM, Jay Kreps jay.kr...@gmail.com wrote: 1. No, the send does a poll/select on all the connections that will block for a specified time waiting for data to read or write on any connection. 2. The api of the selector only allows you to send a request to a ready connection. The definition of ready is that it doesn't have another request in the process of being written (it can have other requests outstanding that were previously written). So if you hit this case it is a programming error, and it should never happen in the producer. The write path for the data is it is written to the internal queue/buffer and the sender grabs data from that for ready connections and writes to them. This slightly complex ready/send api is required to allow back-pressure in the producer to work. -Jay On Mon, Nov 10, 2014 at 2:44 PM, Steven Wu stevenz...@gmail.com wrote: I am checking out the source code of 0.8.2 producer code. I have two questions and hope to get some clarifications. 1) Sender thread/Runnable has this run loop. what if the in-memory queue is mostly empty (i.e. producer has very few msgs to send out)? will this become a simple tight loop that just wastes cpu? // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error(Uncaught error in kafka producer I/O thread: , e); } } 2) Selector#poll() [line #200] if (transmissions.hasSend()) throw new IllegalStateException(Attempt to begin a send operation with prior send operation still in progress.); 2.1) since it's aysnc API, back-to-back sends to the same broker seem very possible to me. should we throw an exception here? 2.2) if this happens, it seems that we will silently drop the msgs without executing callbacks? Thanks, Steven
Re: How producer gets the acknowledgement back
It may be too late to change the Producer API now. I always find ListenableFuture is very nice/usable. It essentially adds callback to Future. It's a lot easier to chain/combine ListenableFuture. http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/ListenableFuture.html On Wed, Sep 24, 2014 at 6:05 PM, Guozhang Wang wangg...@gmail.com wrote: Hi, In the new (Java) producer, you can pass in a callback function in the FutureRecordMetadata send(ProducerRecord record, Callback callback) call, which will be triggered when the ack is received. Alternatively, you can also call Future.get() on the returned future metadata, which will block until the ack is received (i.e., synced sending). Guozhang On Wed, Sep 24, 2014 at 6:10 AM, Sreenivasulu Nallapati sreenu.nallap...@gmail.com wrote: Hello, Can you please help me to get the acknowledgement in producer? After setting the property *request.required.acks to 1, *how producer gets the acknowledgement back? I am trying to get the acknowledgement in java producer. Thanks Sreeni -- -- Guozhang