Re: [DISCUSS] KIP-1008: ParKa - the Marriage of Parquet and Kafka

2023-11-26 Thread Steven Wu
>  if we can produce the segment with Parquet, which is the native format
in a data lake, the consumer application (e.g., Spark jobs for ingestion)
can directly dump the segments as raw byte buffer into the data lake
without unwrapping each record individually and then writing to the Parquet
file one by one with expensive steps of encoding and compression again.

This sounds like an interesting idea. I have one concern though. Data
Lake/table formats (like Delta Lake, Hudi, Iceberg) have column-level
statistics, which are important for query performance. How would column
stats be handled in this proposal?

On Tue, Nov 21, 2023 at 9:21 AM Xinli shang  wrote:

> Hi, all
>
> Can I ask for a discussion on the KIP just created KIP-1008: ParKa - the
> Marriage of Parquet and Kafka
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1008%3A+ParKa+-+the+Marriage+of+Parquet+and+Kafka
> >
> ?
>
> --
> Xinli Shang
>


Re: Review Request 33760: Patch for KAFKA-2121

2015-05-01 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33760/
---

(Updated May 1, 2015, 10:42 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

override java.io.Closeable$close method in Serializer and Deserializer 
interfaces without throwing checked IOException. this is to avoid breaking the 
source compatability.


add a test for checking Serializer is closed during KafkaProducer#close


missing copyright header in previous checkin


remvoed throws Exception for test methods


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
9a57579f87cb19cb6affe6d157ff8446c23e3551 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c44054038066f0d0829d05f082b2ee42b34cded7 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
eea2c28450736d1668c68828f77a49470a82c3d0 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
49f1427bcbe43c773920a25aa69a71d0329296b7 
  clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
6f948f240c906029a0f972bf770f288f390ea714 
  clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION 

Diff: https://reviews.apache.org/r/33760/diff/


Testing
---


Thanks,

Steven Wu



Review Request 33760: Patch for KAFKA-2121

2015-05-01 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33760/
---

Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

override java.io.Closeable$close method in Serializer and Deserializer 
interfaces without throwing checked IOException. this is to avoid breaking the 
source compatability.


add a test for checking Serializer is closed during KafkaProducer#close


Diffs
-

  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
9a57579f87cb19cb6affe6d157ff8446c23e3551 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c44054038066f0d0829d05f082b2ee42b34cded7 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
eea2c28450736d1668c68828f77a49470a82c3d0 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
49f1427bcbe43c773920a25aa69a71d0329296b7 
  clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
6f948f240c906029a0f972bf770f288f390ea714 
  clients/src/test/java/org/apache/kafka/test/MockSerializer.java PRE-CREATION 

Diff: https://reviews.apache.org/r/33760/diff/


Testing
---


Thanks,

Steven Wu



Re: [DISCUSSION] java.io.Closeable in KAFKA-2121

2015-04-28 Thread Steven Wu
On Tue, Apr 28, 2015 at 1:03 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Good point Jay. More specifically we were already implementing without the
 checked exception, we'd need to override close() in the Serializer and
 Deserializer interfaces and omit the throws clause. That definitely makes
 them source compatible. Not sure about binary compatibility, I couldn't
 find a quick answer but I think it's probably still compatible.

 -Ewen

 On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey guys,
 
  You can implement Closable without the checked exception. Having close()
  methods throw checked exceptions isn't very useful unless there is a way
  for the caller to recover. In this case there really isn't, right?
 
  -Jay
 
  On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Folks,
  
   In a recent commit I made regarding KAFKA-2121, there is an omitted API
   change which makes Serializer / Deserializer extending from Closeable,
   whose close() call could throw IOException by declaration. Hence now
 some
   scenario like:
  
   -
  
   SerializerT keySerializer = ...
   SerializerT valueSerializer = ...
   KafkaProducer producer = new KafkaProducer(config, keySerializer,
   valueSerializer)
   // ...
   keySerializer.close()
   valueSerializer.close()
  
   -
  
   will need to capture IOException now.
  
   Want to bring this up for people's attention, and you opinion on
 whether
  we
   should revert this change?
  
   -- Guozhang
  
 



 --
 Thanks,
 Ewen



Re: [DISCUSSION] java.io.Closeable in KAFKA-2121

2015-04-28 Thread Steven Wu
sorry for the previous empty msg.

Jay's idea should work. basically, we override the close method in
Serializer interface.

public interface SerializerT extends Closeable {
@Override
public void close();
}

On Tue, Apr 28, 2015 at 1:10 PM, Steven Wu stevenz...@gmail.com wrote:



 On Tue, Apr 28, 2015 at 1:03 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

 Good point Jay. More specifically we were already implementing without the
 checked exception, we'd need to override close() in the Serializer and
 Deserializer interfaces and omit the throws clause. That definitely makes
 them source compatible. Not sure about binary compatibility, I couldn't
 find a quick answer but I think it's probably still compatible.

 -Ewen

 On Tue, Apr 28, 2015 at 12:30 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey guys,
 
  You can implement Closable without the checked exception. Having close()
  methods throw checked exceptions isn't very useful unless there is a way
  for the caller to recover. In this case there really isn't, right?
 
  -Jay
 
  On Mon, Apr 27, 2015 at 5:51 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Folks,
  
   In a recent commit I made regarding KAFKA-2121, there is an omitted
 API
   change which makes Serializer / Deserializer extending from Closeable,
   whose close() call could throw IOException by declaration. Hence now
 some
   scenario like:
  
   -
  
   SerializerT keySerializer = ...
   SerializerT valueSerializer = ...
   KafkaProducer producer = new KafkaProducer(config, keySerializer,
   valueSerializer)
   // ...
   keySerializer.close()
   valueSerializer.close()
  
   -
  
   will need to capture IOException now.
  
   Want to bring this up for people's attention, and you opinion on
 whether
  we
   should revert this change?
  
   -- Guozhang
  
 



 --
 Thanks,
 Ewen





Review Request 33654: Patch for KAFKA-2121

2015-04-28 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33654/
---

Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

override java.io.Closeable$close method in Serializer and Deserializer 
interfaces without throwing checked IOException. this is to avoid breaking the 
source compatability.


Diffs
-

  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
9a57579f87cb19cb6affe6d157ff8446c23e3551 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c44054038066f0d0829d05f082b2ee42b34cded7 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
eea2c28450736d1668c68828f77a49470a82c3d0 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
49f1427bcbe43c773920a25aa69a71d0329296b7 
  clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
6f948f240c906029a0f972bf770f288f390ea714 

Diff: https://reviews.apache.org/r/33654/diff/


Testing
---


Thanks,

Steven Wu



Review Request 33574: Patch for KAFKA-2151

2015-04-26 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33574/
---

Review request for kafka.


Bugs: KAFKA-2151
https://issues.apache.org/jira/browse/KAFKA-2151


Repository: kafka


Description
---

make MockMetricsReporter a little more generic


Diffs
-

  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
eea2c28450736d1668c68828f77a49470a82c3d0 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
49f1427bcbe43c773920a25aa69a71d0329296b7 
  clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
6f948f240c906029a0f972bf770f288f390ea714 

Diff: https://reviews.apache.org/r/33574/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-20 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 21, 2015, 5:48 a.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

move MockMetricsReporter into clients/src/test/java/org/apache/kafka/test


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
96ac6d0cca990eebe90707465d7d8091c069a4b2 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
21243345311a106f0802ce96c026ba6e815ccf99 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b3d3d7c56acb445be16a3fbe00f05eaba659be46 
  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
13be6a38cb356d55e25151776328a3c38c573db4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-20 Thread Steven Wu


 On April 21, 2015, 3:08 a.m., Guozhang Wang wrote:
  LGTM, besides one minor suggestion: could you move MockMetricsReporter to 
  clients/src/test/java/org/apache/kafka/test?

done. moved MockMetricsReporter to clients/src/test/java/org/apache/kafka/test


- Steven


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80892
---


On April 21, 2015, 5:48 a.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 21, 2015, 5:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 move MockMetricsReporter into clients/src/test/java/org/apache/kafka/test
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 96ac6d0cca990eebe90707465d7d8091c069a4b2 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 21243345311a106f0802ce96c026ba6e815ccf99 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
   clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
 b3d3d7c56acb445be16a3fbe00f05eaba659be46 
   
 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
 13be6a38cb356d55e25151776328a3c38c573db4 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c2fdc23239bd2196cd912c3d121b591f21393eab 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
   clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
 PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




Re: Review Request 33242: Patch for KAFKA-2121

2015-04-20 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 20, 2015, 4:51 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

fix potential resource leak when KafkaProducer contructor failed in the middle


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
96ac6d0cca990eebe90707465d7d8091c069a4b2 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
21243345311a106f0802ce96c026ba6e815ccf99 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b3d3d7c56acb445be16a3fbe00f05eaba659be46 
  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
13be6a38cb356d55e25151776328a3c38c573db4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-20 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 20, 2015, 4:57 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

fix potential resource leak when KafkaProducer contructor failed in the middle


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
96ac6d0cca990eebe90707465d7d8091c069a4b2 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
21243345311a106f0802ce96c026ba6e815ccf99 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b3d3d7c56acb445be16a3fbe00f05eaba659be46 
  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
13be6a38cb356d55e25151776328a3c38c573db4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-20 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 20, 2015, 4:52 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

fix potential resource leak when KafkaProducer contructor failed in the middle


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
96ac6d0cca990eebe90707465d7d8091c069a4b2 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
21243345311a106f0802ce96c026ba6e815ccf99 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b3d3d7c56acb445be16a3fbe00f05eaba659be46 
  clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
13be6a38cb356d55e25151776328a3c38c573db4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-19 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 20, 2015, 3:08 a.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

applied same fix to KafkaConsumer


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
96ac6d0cca990eebe90707465d7d8091c069a4b2 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
21243345311a106f0802ce96c026ba6e815ccf99 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b3d3d7c56acb445be16a3fbe00f05eaba659be46 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-19 Thread Steven Wu


 On April 19, 2015, 11:11 p.m., Ewen Cheslack-Postava wrote:
  LGTM! If this gets merged as is, we should file a follow-up issue for the 
  new consumer, which has the same issue.

OK. I applied the same fix for new consumer. also updated jira title to reflect 
the expanded scope.


- Steven


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80642
---


On April 20, 2015, 3:30 a.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 20, 2015, 3:30 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 applied same fix to KafkaConsumer
 
 
 add test for KafkaConsumer
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
 d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 96ac6d0cca990eebe90707465d7d8091c069a4b2 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 21243345311a106f0802ce96c026ba6e815ccf99 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
   clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
 b3d3d7c56acb445be16a3fbe00f05eaba659be46 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c2fdc23239bd2196cd912c3d121b591f21393eab 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




Re: Review Request 33242: Patch for KAFKA-2121

2015-04-19 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 20, 2015, 3:30 a.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

applied same fix to KafkaConsumer


add test for KafkaConsumer


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
d0da5d7a08a0c3e67e0fe14bb0b0e7c73380f416 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
96ac6d0cca990eebe90707465d7d8091c069a4b2 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
21243345311a106f0802ce96c026ba6e815ccf99 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b3d3d7c56acb445be16a3fbe00f05eaba659be46 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java 
PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-18 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 19, 2015, 3:09 a.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

fix potential resource leak when KafkaProducer contructor failed in the middle


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b3d3d7c56acb445be16a3fbe00f05eaba659be46 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-18 Thread Steven Wu


 On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 548
  https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line548
 
  One idea for making this less verbose and redundant: make all of these 
  classes implement Closeable so we can just write one utility method for 
  trying to close something and catching the exception.
 
 Steven Wu wrote:
 yes. I thought about it. it may break binary compatibility, e.g. 
 Serializer. Sender and Metrics classes are probably only used internally. let 
 me know your thoughts.
 
 Ewen Cheslack-Postava wrote:
 I'm pretty sure it's fine, based on this
 
 Changing the direct superclass or the set of direct superinterfaces of a 
 class type will not break compatibility with pre-existing binaries, provided 
 that the total set of superclasses or superinterfaces, respectively, of the 
 class type loses no members.
 
 from https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html

ok. you could be correct. here is another reference (easier to understand than 
the jls doc)

Expand superinterface set (direct or inherited)-   Binary 
compatible

from https://wiki.eclipse.org/Evolving_Java-based_APIs_2#Evolving_API_Interfaces


- Steven


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80346
---


On April 19, 2015, 3:09 a.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 19, 2015, 3:09 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 fix potential resource leak when KafkaProducer contructor failed in the middle
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
 b3d3d7c56acb445be16a3fbe00f05eaba659be46 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c2fdc23239bd2196cd912c3d121b591f21393eab 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 16, 2015, 4:55 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

add a unit test file


changes based on Ewen's review feedbacks


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 16, 2015, 5:03 p.m.)


Review request for kafka.


Changes
---

address Ewen's review feedbacks

I'm getting a bunch of checkstyle complaints when I try to test. These should 
all be easy to fix (and should be causing tests to fail before even running). 
The only rule that might not be obvious from the error message is that the 
static final field in MockMetricsReporter is expected to be all-caps since it 
looks like a constant to checkstyle.
 fixed

In the constructor, could we throw some subclass of KafkaException instead? The 
new clients try to stick to that exception hierarchy except in a few special 
cases. Alternatively, maybe if we caught Error and RuntimeException instead of 
Throwable then we could just rethrow the same exception?
 I changed RuntimeException to KafkaException. can't think of a good subclass 
 name for this scenario. ProducerConstructException? hence, stay with the 
 generic KafkaException

The new version of close() will swallow exceptions when called normally (i.e. 
not from the constructor). They'll be logged, but the caller won't see the 
exception anymore. Maybe we should save the first exception and rethrow it?
 refactored a private close(boolean swallowException) method

Exception messages should be capitalized.
 fixed

In the test, we should probably have an assert outside the catch. And is there 
any reason the closeCount is being reset to 0?
 yes. we should have an assert outside the catch
 I was just reset the CLOSE_COUNT in case another test method need to check 
 the count.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

fix potential resource leak when KafkaProducer throws exception in the middle


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 16, 2015, 5:44 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

add a unit test file


changes based on Ewen's review feedbacks


fix capitalization in error log


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu


 On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 531
  https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line531
 
  This code is all single threaded, is the AtomicReference really 
  necessary here?

not really necessary. just trying to use the compareAndSet. otherwise, I need 
to do if(firstException == null) firstException = t. I can certainly change 
it. let me know.


 On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 548
  https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line548
 
  One idea for making this less verbose and redundant: make all of these 
  classes implement Closeable so we can just write one utility method for 
  trying to close something and catching the exception.

yes. I thought about it. it may break binary compatibility, e.g. Serializer. 
Sender and Metrics classes are probably only used internally. let me know your 
thoughts.


- Steven


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80346
---


On April 16, 2015, 5:44 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 16, 2015, 5:44 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 add a unit test file
 
 
 changes based on Ewen's review feedbacks
 
 
 fix capitalization in error log
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu


 On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote:
  Looks good, left a few comments.
  
  KafkaConsumer suffers from this same problem. Patching that should be 
  pretty much identical -- any chance you could extend this to cover that as 
  well?

sure. I can extend this to KafkaConsumer later.


- Steven


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80346
---


On April 16, 2015, 5:44 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 16, 2015, 5:44 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 add a unit test file
 
 
 changes based on Ewen's review feedbacks
 
 
 fix capitalization in error log
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




Review Request 33242: Patch for KAFKA-2121

2015-04-15 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description
---

add a unit test file


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: [DISCUSS] error handling in java KafkaProducer

2015-04-14 Thread Steven Wu
Thanks, Ewen and Guozhang!

I will go with the try-catch option then. here is the jira. feel free to
assign it to me. I will try to submit a patch this week.
https://issues.apache.org/jira/browse/KAFKA-2121

On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:

 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.

 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

  Steven,
 
  Looks like there is even more that could potentially be leaked -- since
 key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't
 a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we
 might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would
 have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible
 change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully
 whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing
 by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK
 keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
 
 config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
 
 ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if
 hostname
   validation threw an exception, constructor won't call the close method
 of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for
 any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks
 for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how
 we
   should fix the issue.
  
   Thanks,
   Steven
  
 
 
 
  --
  Thanks,
  Ewen
 



 --
 -- Guozhang



Re: [DISCUSS] error handling in java KafkaProducer

2015-04-14 Thread Steven Wu
I submitted a patch attempt in the jira.

On Tue, Apr 14, 2015 at 10:16 AM, Steven Wu stevenz...@gmail.com wrote:

 Thanks, Ewen and Guozhang!

 I will go with the try-catch option then. here is the jira. feel free to
 assign it to me. I will try to submit a patch this week.
 https://issues.apache.org/jira/browse/KAFKA-2121


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:

 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.

 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Steven,
 
  Looks like there is even more that could potentially be leaked -- since
 key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1
 isn't a
  great option since, as you said, it doesn't really address the
 underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we
 might
  want to introduce additional constructors to make testing easier, but
 the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would
 have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible
 change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully
 whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing
 by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't
 been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as
 you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com
 wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK
 keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
 
 config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
 
 ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if
 hostname
   validation threw an exception, constructor won't call the close
 method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is
 only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for
 any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way,
 when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks
 for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how
 we
   should fix the issue.
  
   Thanks,
   Steven
  
 
 
 
  --
  Thanks,
  Ewen
 



 --
 -- Guozhang





[DISCUSS] error handling in java KafkaProducer

2015-04-13 Thread Steven Wu
Here is the resource leak problem that we have encountered when 0.8.2 java
KafkaProducer failed in constructor. here is the code snippet of
KafkaProducer to illustrate the problem.

---
public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
SerializerV valueSerializer) {

// create metrcis reporter via reflection
ListMetricsReporter reporters =
config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);

// validate bootstrap servers
ListInetSocketAddress addresses =
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

}
---

let's say MyMetricsReporter creates a thread in constructor. if hostname
validation threw an exception, constructor won't call the close method of
MyMetricsReporter to clean up the resource. as a result, we created thread
leak issue. this becomes worse when we try to auto recovery (i.e. keep
creating KafkaProducer again - failing again - more thread leaks).

there are multiple options of fixing this.

1) just move the hostname validation to the beginning. but this is only fix
one symtom. it didn't fix the fundamental problem. what if some other lines
throw an exception.

2) use try-catch. in the catch section, try to call close methods for any
non-null objects constructed so far.

3) explicitly declare the dependency in the constructor. this way, when
KafkaProducer threw an exception, I can call close method of metrics
reporters for releasing resources.
KafkaProducer(..., ListMetricsReporter reporters)
we don't have to dependency injection framework. but generally hiding
dependency is a bad coding practice. it is also hard to plug in mocks for
dependencies. this is probably the most intrusive change.

I am willing to submit a patch. but like to hear your opinions on how we
should fix the issue.

Thanks,
Steven


Re: Metrics package discussion

2015-03-31 Thread Steven Wu
 My main concern is that we don't do the migration in 0.8.3, we will be
left
with some metrics in YM format and some others in KM format (as we start
sharing client code on the broker). This is probably a worse situation to
be in.

+1. I am not sure how our servo adaptor will work if there are two formats
for metrics? unless there is an easy way to check the format (YM/KM).


On Tue, Mar 31, 2015 at 9:40 AM, Jun Rao j...@confluent.io wrote:

 (2) The metrics are clearly part of the client API and we are not changing
 that (at least for the new client). Arguably, the metrics are also part of
 the broker side API. However, since they affect fewer parties (mostly just
 the Kafka admins), it may be easier to make those changes.

 My main concern is that we don't do the migration in 0.8.3, we will be left
 with some metrics in YM format and some others in KM format (as we start
 sharing client code on the broker). This is probably a worse situation to
 be in.

 Thanks,

 Jun

 On Tue, Mar 31, 2015 at 9:26 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

  (2) I believe we agreed that our metrics are a public API. I believe
  we also agree we don't break API in minor releases. So, it seems
  obvious to me that we can't make breaking changes to metrics in minor
  releases. I'm not convinced we did it in the past is a good reason
  to do it again.
 
  Is there a strong reason to do it in a 0.8.3 time-frame?
 
  On Tue, Mar 31, 2015 at 7:59 AM, Jun Rao j...@confluent.io wrote:
   (2) Not sure why we can't do this in 0.8.3. We changed the metrics
 names
  in
   0.8.2 already. Given that we need to share code btw the client and the
   core, and we need to keep the metrics consistent on the broker, it
 seems
   that we have no choice but to migrate to KM. If so, it seems that the
   sooner that we do this, the better. It is important to give people an
  easy
   path for migration. However, it may not be easy to keep the mbean names
   exactly the same. For example, YM has hardcoded attributes (e.g.
   1-min-rate, 5-min-rate, 15-min-rate, etc for rates) that are not
  available
   in KM.
  
   One benefit out of this migration is that one can get the metrics in
 the
   client and the broker in the same way.
  
   Thanks,
  
   Jun
  
   On Mon, Mar 30, 2015 at 9:26 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
   (1) It will be interesting to see what others use for monitoring
   integration, to see what is already covered with existing JMX
   integrations and what needs special support.
  
   (2) I think the migration story is more important - this is a
   non-compatible change, right? So we can't do it in 0.8.3 timeframe, it
   has to be in 0.9? And we need to figure out how will users migrate -
   do we just tell everyone please reconfigure all your monitors from
   scratch - don't worry, it is worth it?
   I know you keep saying we did it before and our users are used to it,
   but I think there are a lot more users now, and some of them have
   different compatibility expectations. We probably need to find:
   * A least painful way to migrate - can we keep the names of at least
   most of the metrics intact?
   * Good explanation of what users gain from this painful migration
   (i.e. more accurate statistics due to gazillion histograms)
  
  
  
  
  
  
   On Mon, Mar 30, 2015 at 6:29 PM, Jun Rao j...@confluent.io wrote:
If we are committed to migrating the broker side metrics to KM for
 the
   next
release, we will need to (1) have a story on supporting common
  reporters
(as listed in KAFKA-1930), and (2) see if the current histogram
  support
   is
good enough for measuring things like request time.
   
Thanks,
   
Jun
   
On Mon, Mar 30, 2015 at 3:03 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:
   
If we do plan to use the network code in client, I think that is a
  good
reason in favor of migration. It will be unnecessary to have
 metrics
   from
multiple libraries coexist since our users will have to start
  monitoring
these new metrics anyway.
   
I also agree with Jay that in multi-tenant clusters people care
 about
detailed statistics for their own application over global numbers.
   
Based on the arguments so far, I'm +1 for migrating to KM.
   
Thanks,
Aditya
   

From: Jun Rao [j...@confluent.io]
Sent: Sunday, March 29, 2015 9:44 AM
To: dev@kafka.apache.org
Subject: Re: Metrics package discussion
   
There is another thing to consider. We plan to reuse the client
   components
on the server side over time. For example, as part of the security
   work, we
are looking into replacing the server side network code with the
  client
network code (KAFKA-1928). However, the client network already has
   metrics
based on KM.
   
Thanks,
   
Jun
   
On Sat, Mar 28, 2015 at 1:34 PM, Jay Kreps jay.kr...@gmail.com
  wrote:
   
 I 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-20 Thread Steven Wu
   separately.
 That
   will also contain a section on quotas.
  
   3. Dynamic Configuration management - Being discussed in
 KIP-5.
 Basically
   we need something that will model default quotas and allow
   per-client
   overrides.
  
   Is there something else that I'm missing?
  
   Thanks,
   Aditya
   
   From: Jay Kreps [jay.kr...@gmail.com]
   Sent: Wednesday, March 18, 2015 2:10 PM
   To: dev@kafka.apache.org
   Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas
  
   Hey Steven,
  
   The current proposal is actually to enforce quotas at the
   client/application level, NOT the topic level. So if you have
 a
service
   with a few dozen instances the quota is against all of those
instances
   added up across all their topics. So actually the effect would
  be
   the
 same
   either way but throttling gives the producer the choice of
  either
 blocking
   or dropping.
  
   -Jay
  
   On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu 
  stevenz...@gmail.com
   
 wrote:
  
Jay,
   
let's say an app produces to 10 different topics. one of the
   topic
is
   sent
from a library. due to whatever condition/bug, this lib
 starts
   to
 send
messages over the quota. if we go with the delayed response
 approach, it
will cause the whole shared RecordAccumulator buffer to be
   filled
up.
   that
will penalize other 9 topics who are within the quota. that
 is
   the
unfairness point that Ewen and I were trying to make.
   
if broker just drop the msg and return an error/status code
 indicates the
drop and why. then producer can just move on and accept the
   drop.
 shared
buffer won't be saturated and other 9 topics won't be
  penalized.
   
Thanks,
Steven
   
   
   
On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps 
  jay.kr...@gmail.com
   
 wrote:
   
 Hey Steven,

 It is true that hitting the quota will cause back-pressure
  on
   the
producer.
 But the solution is simple, a producer that wants to avoid
   this
 should
stay
 under its quota. In other words this is a contract between
  the
 cluster
and
 the client, with each side having something to uphold.
 Quite
 possibly
   the
 same thing will happen in the absence of a quota, a client
   that
   produces
an
 unexpected amount of load will hit the limits of the
 server
   and
experience
 backpressure. Quotas just allow you to set that same limit
  at
 something
 lower than 100% of all resources on the server, which is
   useful
 for a
 shared cluster.

 -Jay

 On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu 
stevenz...@gmail.com
wrote:

  wait. we create one kafka producer for each cluster.
 each
 cluster can
 have
  many topics. if producer buffer got filled up due to
  delayed
 response
for
  one throttled topic, won't that penalize other topics
   unfairly?
 it
seems
 to
  me that broker should just return error without delay.
 
  sorry that I am chatting to myself :)
 
  On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu 
 stevenz...@gmail.com
 wrote:
 
   I think I can answer my own question. delayed response
   will
 cause
   the
   producer buffer to be full, which then result in
 either
thread
blocking
  or
   message drop.
  
   On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu 
 stevenz...@gmail.com
  wrote:
  
   please correct me if I am missing sth here. I am not
 understanding
how
   would throttle work without cooperation/back-off from
 producer.
   new
 Java
   producer supports non-blocking API. why would delayed
 response be
able
  to
   slow down producer? producer will continue to fire
  async
 sends.
  
   On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang 
   wangg...@gmail.com

   wrote:
  
   I think we are really discussing two separate issues
   here:
  
   1. Whether we should a)
   append-then-block-then-returnOKButThrottled
 or
  b)
   block-then-returnFailDuetoThrottled for quota
 actions
  on
 produce
   requests.
  
   Both these approaches assume some kind of
   well-behaveness
of
 the
  clients:
   option a) assumes the client sets an proper timeout
   value
 while

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Steven Wu
wait. we create one kafka producer for each cluster. each cluster can have
many topics. if producer buffer got filled up due to delayed response for
one throttled topic, won't that penalize other topics unfairly? it seems to
me that broker should just return error without delay.

sorry that I am chatting to myself :)

On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote:

 I think I can answer my own question. delayed response will cause the
 producer buffer to be full, which then result in either thread blocking or
 message drop.

 On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote:

 please correct me if I am missing sth here. I am not understanding how
 would throttle work without cooperation/back-off from producer. new Java
 producer supports non-blocking API. why would delayed response be able to
 slow down producer? producer will continue to fire async sends.

 On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com
 wrote:

 I think we are really discussing two separate issues here:

 1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
 block-then-returnFailDuetoThrottled for quota actions on produce
 requests.

 Both these approaches assume some kind of well-behaveness of the clients:
 option a) assumes the client sets an proper timeout value while can just
 ignore OKButThrottled response, while option b) assumes the client
 handles the FailDuetoThrottled appropriately. For any malicious clients
 that, for example, just keep retrying either intentionally or not,
 neither
 of these approaches are actually effective.

 2. For OKButThrottled and FailDuetoThrottled responses, shall we
 encode
 them as error codes or augment the protocol to use a separate field
 indicating status codes.

 Today we have already incorporated some status code as error codes in the
 responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
 is of course using a single field for response status like the HTTP
 status
 codes, while the cons is that it requires clients to handle the error
 codes
 carefully.

 I think maybe we can actually extend the single-code approach to overcome
 its drawbacks, that is, wrap the error codes semantics to the users so
 that
 users do not need to handle the codes one-by-one. More concretely,
 following Jay's example the client could write sth. like this:


 -

   if(error.isOK())
  // status code is good or the code can be simply ignored for this
 request type, process the request
   else if(error.needsRetry())
  // throttled, transient error, etc: retry
   else if(error.isFatal())
  // non-retriable errors, etc: notify / terminate / other handling

 -

 Only when the clients really want to handle, for example
 FailDuetoThrottled
 status code specifically, it needs to:

   if(error.isOK())
  // status code is good or the code can be simply ignored for this
 request type, process the request
   else if(error == FailDuetoThrottled )
  // throttled: log it
   else if(error.needsRetry())
  // transient error, etc: retry
   else if(error.isFatal())
  // non-retriable errors, etc: notify / terminate / other handling

 -

 And for implementation we can probably group the codes accordingly like
 HTTP status code such that we can do:

 boolean Error.isOK() {
   return code  300  code = 200;
 }

 Guozhang

 On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava 
 e...@confluent.io
 wrote:

  Agreed that trying to shoehorn non-error codes into the error field is
 a
  bad idea. It makes it *way* too easy to write code that looks (and
 should
  be) correct but is actually incorrect. If necessary, I think it's much
  better to to spend a couple of extra bytes to encode that information
  separately (a status or warning section of the response). An
 indication
  that throttling is occurring is something I'd expect to be indicated
 by a
  bit flag in the response rather than as an error code.
 
  Gwen - I think an error code makes sense when the request actually
 failed.
  Option B, which Jun was advocating, would have appended the messages
  successfully. If the rate-limiting case you're talking about had
  successfully committed the messages, I would say that's also a bad use
 of
  error codes.
 
 
  On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   We discussed an error code for rate-limiting (which I think made
   sense), isn't it a similar case?
  
   On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
My concern is that as soon as you start encoding non-error response
information into error codes the next question is what to do if two
  such
codes apply (i.e. you have a replica down and the response is
  quota'd). I
think I am trying to argue that error should mean why we failed
 your
request, for which there will really only be one reason, and any
 other
useful

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Steven Wu
I think I can answer my own question. delayed response will cause the
producer buffer to be full, which then result in either thread blocking or
message drop.

On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote:

 please correct me if I am missing sth here. I am not understanding how
 would throttle work without cooperation/back-off from producer. new Java
 producer supports non-blocking API. why would delayed response be able to
 slow down producer? producer will continue to fire async sends.

 On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com
 wrote:

 I think we are really discussing two separate issues here:

 1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
 block-then-returnFailDuetoThrottled for quota actions on produce requests.

 Both these approaches assume some kind of well-behaveness of the clients:
 option a) assumes the client sets an proper timeout value while can just
 ignore OKButThrottled response, while option b) assumes the client
 handles the FailDuetoThrottled appropriately. For any malicious clients
 that, for example, just keep retrying either intentionally or not, neither
 of these approaches are actually effective.

 2. For OKButThrottled and FailDuetoThrottled responses, shall we
 encode
 them as error codes or augment the protocol to use a separate field
 indicating status codes.

 Today we have already incorporated some status code as error codes in the
 responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
 is of course using a single field for response status like the HTTP status
 codes, while the cons is that it requires clients to handle the error
 codes
 carefully.

 I think maybe we can actually extend the single-code approach to overcome
 its drawbacks, that is, wrap the error codes semantics to the users so
 that
 users do not need to handle the codes one-by-one. More concretely,
 following Jay's example the client could write sth. like this:


 -

   if(error.isOK())
  // status code is good or the code can be simply ignored for this
 request type, process the request
   else if(error.needsRetry())
  // throttled, transient error, etc: retry
   else if(error.isFatal())
  // non-retriable errors, etc: notify / terminate / other handling

 -

 Only when the clients really want to handle, for example
 FailDuetoThrottled
 status code specifically, it needs to:

   if(error.isOK())
  // status code is good or the code can be simply ignored for this
 request type, process the request
   else if(error == FailDuetoThrottled )
  // throttled: log it
   else if(error.needsRetry())
  // transient error, etc: retry
   else if(error.isFatal())
  // non-retriable errors, etc: notify / terminate / other handling

 -

 And for implementation we can probably group the codes accordingly like
 HTTP status code such that we can do:

 boolean Error.isOK() {
   return code  300  code = 200;
 }

 Guozhang

 On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava 
 e...@confluent.io
 wrote:

  Agreed that trying to shoehorn non-error codes into the error field is a
  bad idea. It makes it *way* too easy to write code that looks (and
 should
  be) correct but is actually incorrect. If necessary, I think it's much
  better to to spend a couple of extra bytes to encode that information
  separately (a status or warning section of the response). An
 indication
  that throttling is occurring is something I'd expect to be indicated by
 a
  bit flag in the response rather than as an error code.
 
  Gwen - I think an error code makes sense when the request actually
 failed.
  Option B, which Jun was advocating, would have appended the messages
  successfully. If the rate-limiting case you're talking about had
  successfully committed the messages, I would say that's also a bad use
 of
  error codes.
 
 
  On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   We discussed an error code for rate-limiting (which I think made
   sense), isn't it a similar case?
  
   On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
My concern is that as soon as you start encoding non-error response
information into error codes the next question is what to do if two
  such
codes apply (i.e. you have a replica down and the response is
  quota'd). I
think I am trying to argue that error should mean why we failed
 your
request, for which there will really only be one reason, and any
 other
useful information we want to send back is just another field in the
response.
   
-Jay
   
On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira 
 gshap...@cloudera.com
   wrote:
   
I think its not too late to reserve a set of error codes (200-299?)
for non-error codes.
   
It won't be backward compatible (i.e. clients that currently do
 else
throw will throw on non-errors), but perhaps

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Steven Wu
please correct me if I am missing sth here. I am not understanding how
would throttle work without cooperation/back-off from producer. new Java
producer supports non-blocking API. why would delayed response be able to
slow down producer? producer will continue to fire async sends.

On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote:

 I think we are really discussing two separate issues here:

 1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
 block-then-returnFailDuetoThrottled for quota actions on produce requests.

 Both these approaches assume some kind of well-behaveness of the clients:
 option a) assumes the client sets an proper timeout value while can just
 ignore OKButThrottled response, while option b) assumes the client
 handles the FailDuetoThrottled appropriately. For any malicious clients
 that, for example, just keep retrying either intentionally or not, neither
 of these approaches are actually effective.

 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode
 them as error codes or augment the protocol to use a separate field
 indicating status codes.

 Today we have already incorporated some status code as error codes in the
 responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
 is of course using a single field for response status like the HTTP status
 codes, while the cons is that it requires clients to handle the error codes
 carefully.

 I think maybe we can actually extend the single-code approach to overcome
 its drawbacks, that is, wrap the error codes semantics to the users so that
 users do not need to handle the codes one-by-one. More concretely,
 following Jay's example the client could write sth. like this:


 -

   if(error.isOK())
  // status code is good or the code can be simply ignored for this
 request type, process the request
   else if(error.needsRetry())
  // throttled, transient error, etc: retry
   else if(error.isFatal())
  // non-retriable errors, etc: notify / terminate / other handling

 -

 Only when the clients really want to handle, for example FailDuetoThrottled
 status code specifically, it needs to:

   if(error.isOK())
  // status code is good or the code can be simply ignored for this
 request type, process the request
   else if(error == FailDuetoThrottled )
  // throttled: log it
   else if(error.needsRetry())
  // transient error, etc: retry
   else if(error.isFatal())
  // non-retriable errors, etc: notify / terminate / other handling

 -

 And for implementation we can probably group the codes accordingly like
 HTTP status code such that we can do:

 boolean Error.isOK() {
   return code  300  code = 200;
 }

 Guozhang

 On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Agreed that trying to shoehorn non-error codes into the error field is a
  bad idea. It makes it *way* too easy to write code that looks (and should
  be) correct but is actually incorrect. If necessary, I think it's much
  better to to spend a couple of extra bytes to encode that information
  separately (a status or warning section of the response). An
 indication
  that throttling is occurring is something I'd expect to be indicated by a
  bit flag in the response rather than as an error code.
 
  Gwen - I think an error code makes sense when the request actually
 failed.
  Option B, which Jun was advocating, would have appended the messages
  successfully. If the rate-limiting case you're talking about had
  successfully committed the messages, I would say that's also a bad use of
  error codes.
 
 
  On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   We discussed an error code for rate-limiting (which I think made
   sense), isn't it a similar case?
  
   On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
My concern is that as soon as you start encoding non-error response
information into error codes the next question is what to do if two
  such
codes apply (i.e. you have a replica down and the response is
  quota'd). I
think I am trying to argue that error should mean why we failed your
request, for which there will really only be one reason, and any
 other
useful information we want to send back is just another field in the
response.
   
-Jay
   
On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com
 
   wrote:
   
I think its not too late to reserve a set of error codes (200-299?)
for non-error codes.
   
It won't be backward compatible (i.e. clients that currently do
 else
throw will throw on non-errors), but perhaps its worthwhile.
   
On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com
  wrote:
 Hey Jun,

 I'd really really really like to avoid that. Having just spent a
   bunch of
 time on the clients, using the error codes to 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Steven Wu
Ewen, I see your point regarding the shared buffer. yes, a bad/slow broker
could potentially consume up all buffer. On the other hand, I do like the
batching behavior of shared RecordAccumulator buffer.

On Tue, Mar 17, 2015 at 8:25 AM, Guozhang Wang wangg...@gmail.com wrote:

 Ewen,

 1. I think we are on the same page as per malicious clients, that it
 should not be the target of either approach. I was just trying to separate
 the discussion from what if user just keep retrying and maybe I was not
 clear.

 2. I was not advocating option A on the wiki, in my previous email I
 actually assume that option is already dropped and we are only considering
 option B (which is my option b) in the email) and C (option a) in my
 email), and I think with some proper wrapping of status codes (today we
 still call them error codes) option B in the wiki may not necessarily
 require people who implement clients to handle each status code one-by-one.

 Guozhang

 On Tue, Mar 17, 2015 at 12:22 AM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Steven - that's a reasonable concern. I think I've mentioned the same
 sort
  of issue in the issues about the new producer's RecordAccumulator not
  timing out sends, e.g. in
 https://issues.apache.org/jira/browse/KAFKA-1788
  .
  The shared buffer causes problems if one broker isn't available for
 awhile
  since messages to that broker end up consuming the entire buffer. You can
  end up with a similar problem here due to the effective rate limiting
  caused by delaying responses.
 
  Guozhang - I think only option A from the KIP is actually an error. If we
  want to look to HTTP for examples, there's an RFC that defines the Too
 Many
  Requests response to handle rate limiting:
  http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is
 an
  error, specifically a client error since its in the 400 range.The
  implication from the status code (429), name of the response, and the
  example given is that is is an error and no real data is returned, which
  would correspond to option A from the KIP. Note that the protocol
 provides
  a mechanism for giving extra (optional) information about when you should
  retry (via headers). I'd guess that even despite that, most systems that
  encounter a 429 use some ad hoc backoff mechanism because they only try
 to
  detect anything in the 400 range...
 
  One additional point -- I think malicious clients shouldn't be our
 target
  here, they can do a lot worse than what's been addressed in this thread.
  But I do agree that any proposal should have a clear explanation of how
  existing clients that are ignorant of quotas would behave (which is why
  options b and c make a lot of sense -- they rate limit without requiring
 an
  update to normally-behaving clients).
 
 
  On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com
 wrote:
 
   wait. we create one kafka producer for each cluster. each cluster can
  have
   many topics. if producer buffer got filled up due to delayed response
 for
   one throttled topic, won't that penalize other topics unfairly? it
 seems
  to
   me that broker should just return error without delay.
  
   sorry that I am chatting to myself :)
  
   On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com
  wrote:
  
I think I can answer my own question. delayed response will cause the
producer buffer to be full, which then result in either thread
 blocking
   or
message drop.
   
On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com
   wrote:
   
please correct me if I am missing sth here. I am not understanding
 how
would throttle work without cooperation/back-off from producer. new
  Java
producer supports non-blocking API. why would delayed response be
 able
   to
slow down producer? producer will continue to fire async sends.
   
On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com
 
wrote:
   
I think we are really discussing two separate issues here:
   
1. Whether we should a) append-then-block-then-returnOKButThrottled
  or
   b)
block-then-returnFailDuetoThrottled for quota actions on produce
requests.
   
Both these approaches assume some kind of well-behaveness of the
   clients:
option a) assumes the client sets an proper timeout value while can
   just
ignore OKButThrottled response, while option b) assumes the
 client
handles the FailDuetoThrottled appropriately. For any malicious
   clients
that, for example, just keep retrying either intentionally or not,
neither
of these approaches are actually effective.
   
2. For OKButThrottled and FailDuetoThrottled responses, shall
 we
encode
them as error codes or augment the protocol to use a separate field
indicating status codes.
   
Today we have already incorporated some status code as error codes
 in
   the
responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Steven Wu
Jay,

let's say an app produces to 10 different topics. one of the topic is sent
from a library. due to whatever condition/bug, this lib starts to send
messages over the quota. if we go with the delayed response approach, it
will cause the whole shared RecordAccumulator buffer to be filled up. that
will penalize other 9 topics who are within the quota. that is the
unfairness point that Ewen and I were trying to make.

if broker just drop the msg and return an error/status code indicates the
drop and why. then producer can just move on and accept the drop. shared
buffer won't be saturated and other 9 topics won't be penalized.

Thanks,
Steven



On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Steven,

 It is true that hitting the quota will cause back-pressure on the producer.
 But the solution is simple, a producer that wants to avoid this should stay
 under its quota. In other words this is a contract between the cluster and
 the client, with each side having something to uphold. Quite possibly the
 same thing will happen in the absence of a quota, a client that produces an
 unexpected amount of load will hit the limits of the server and experience
 backpressure. Quotas just allow you to set that same limit at something
 lower than 100% of all resources on the server, which is useful for a
 shared cluster.

 -Jay

 On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote:

  wait. we create one kafka producer for each cluster. each cluster can
 have
  many topics. if producer buffer got filled up due to delayed response for
  one throttled topic, won't that penalize other topics unfairly? it seems
 to
  me that broker should just return error without delay.
 
  sorry that I am chatting to myself :)
 
  On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com
 wrote:
 
   I think I can answer my own question. delayed response will cause the
   producer buffer to be full, which then result in either thread blocking
  or
   message drop.
  
   On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com
  wrote:
  
   please correct me if I am missing sth here. I am not understanding how
   would throttle work without cooperation/back-off from producer. new
 Java
   producer supports non-blocking API. why would delayed response be able
  to
   slow down producer? producer will continue to fire async sends.
  
   On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com
   wrote:
  
   I think we are really discussing two separate issues here:
  
   1. Whether we should a) append-then-block-then-returnOKButThrottled
 or
  b)
   block-then-returnFailDuetoThrottled for quota actions on produce
   requests.
  
   Both these approaches assume some kind of well-behaveness of the
  clients:
   option a) assumes the client sets an proper timeout value while can
  just
   ignore OKButThrottled response, while option b) assumes the client
   handles the FailDuetoThrottled appropriately. For any malicious
  clients
   that, for example, just keep retrying either intentionally or not,
   neither
   of these approaches are actually effective.
  
   2. For OKButThrottled and FailDuetoThrottled responses, shall we
   encode
   them as error codes or augment the protocol to use a separate field
   indicating status codes.
  
   Today we have already incorporated some status code as error codes in
  the
   responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of
  this
   is of course using a single field for response status like the HTTP
   status
   codes, while the cons is that it requires clients to handle the error
   codes
   carefully.
  
   I think maybe we can actually extend the single-code approach to
  overcome
   its drawbacks, that is, wrap the error codes semantics to the users
 so
   that
   users do not need to handle the codes one-by-one. More concretely,
   following Jay's example the client could write sth. like this:
  
  
   -
  
 if(error.isOK())
// status code is good or the code can be simply ignored for
 this
   request type, process the request
 else if(error.needsRetry())
// throttled, transient error, etc: retry
 else if(error.isFatal())
// non-retriable errors, etc: notify / terminate / other
 handling
  
   -
  
   Only when the clients really want to handle, for example
   FailDuetoThrottled
   status code specifically, it needs to:
  
 if(error.isOK())
// status code is good or the code can be simply ignored for
 this
   request type, process the request
 else if(error == FailDuetoThrottled )
// throttled: log it
 else if(error.needsRetry())
// transient error, etc: retry
 else if(error.isFatal())
// non-retriable errors, etc: notify / terminate / other
 handling
  
   -
  
   And for implementation we can probably group the codes accordingly
 like
   HTTP status code such that we can do

Re: New consumer client

2015-02-10 Thread Steven Wu
Jay, we have observed CRC corruption too occasionally. I reported in an
thread and asked how should we handle some error conditions from old
high-level consumer.

On Mon, Feb 9, 2015 at 11:36 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi Jay,

 1) Sorry to get back to you so late.  It is CRC check error on any consumer
 thread regardless of the server.   What happens is I have to catch this
 exception is skip the message now.  There is no option to re-fetch this
 message.   Is there any way to add behavior in Java consumer to re-fetch
 this offset CRC failed offset.


 2)  Secondly,  can you please add default behavior to auto set
 'fetch.message.max.bytes' = broker's message.max.bytes.  This will ensure
 smooth configuration for both simple and high level consumer. This will
 take burden away from Kafka user to config this property.  We had lag issue
 due to this mis configuration and drop messages on Camus side and (camus
 has different setting for simple consumer).  It would be great to auto
 config this if user did not supply this in configuration.

 Let me know if you agree with #2.

 Thanks,

 Bhavesh

 On Mon, Jan 12, 2015 at 9:25 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Bhavesh,
 
  This seems like a serious issue and not one anyone else has reported. I
  don't know what you mean by corrupt message, are you saying the CRC check
  fails? If so, that check is done both by the broker (prior to appending
 to
  the log) and the consumer so that implies either a bug in the broker or
  else disk corruption on the server.
 
  I do have an option to disable the CRC check in the consumer, though
  depending on the nature of the corruption that can just lead to more
  serious errors (depending on what is corrupted).
 
  -jay
 
  On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry 
  mistry.p.bhav...@gmail.com
   wrote:
 
   Hi Jay,
  
   One of the pain point of existing consumer code is CORRUPT_MESSAGE
   occasionally. Right now, it is hard to pin-point the problem of
   CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is
  there
   any proposal to auto skip corrupted message and have reporting
 visibility
   of CRC error(metics etc or traceability to find corruption).per topic
  etc ?
   I am not sure if this is correct email thread to address this if not
  please
   let me know.
  
   Will provide feedback about new consumer api and changes.
   Thanks,
  
   Bhavesh
  
   On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps j...@confluent.io wrote:
  
I uploaded an updated version of the new consumer client (
https://issues.apache.org/jira/browse/KAFKA-1760). This is now
 almost
feature complete, and has pretty reasonable testing and metrics. I
  think
   it
is ready for review and could be checked in once 0.8.2 is out.
   
For those who haven't been following this is meant to be a new
 consumer
client, like the new producer is 0.8.2, and intended to replace the
existing high level and simple scala consumers.
   
This still needs the server-side implementation of the partition
   assignment
and group management to be fully functional. I have just stubbed this
  out
in the server to allow the implementation and testing of the server
 but
actual usage will require it. However the client that exists now is
actually a fully functional replacement for the simple consumer
 that
  is
vastly easier to use correctly as it internally does all the
 discovery
   and
failover.
   
It would be great if people could take a look at this code, and
particularly at the public apis which have several small changes from
  the
original proposal.
   
Summary
   
What's there:
1. Simple consumer functionality
2. Offset commit and fetch
3. Ability to change position with seek
4. Ability to commit all or just some offsets
5. Controller discovery, failure detection, heartbeat, and fail-over
6. Controller partition assignment
7. Logging
8. Metrics
9. Integration tests including tests that simulate random broker
  failures
10. Integration into the consumer performance test
   
Limitations:
1. There could be some lingering bugs in the group management
 support,
  it
is hard to fully test fully with just the stub support on the server,
  so
we'll need to get the server working to do better I think.
2. I haven't implemented wild-card subscriptions yet.
3. No integration with console consumer yet
   
Performance
   
I did some performance comparison with the old consumer over
 localhost
  on
my laptop. Usually localhost isn't good for testing but in this case
 it
   is
good because it has near infinite bandwidth so it does a good job at
catching inefficiencies that would be hidden with a slower network.
  These
numbers probably aren't representative of what you would get over a
  real
network, but help bring out the relative efficiencies.
Here 

Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-01 Thread Steven Wu
In Netflix, we have been using route53 DNS name as bootstrap servers in AWS
env. Basically, when a kafka broker start, we add it to route53 DNS name
for the cluster. this is like the VIP that Jay suggested.

But we are also moving toward to use Eureka service registry for
bootstrapping. We are worried that if DNS name happens to resolve to a bad
broker. it might impact the bootstrap process/resiliency. We want to get a
list of brokers from Eureka to pass in as bootstrap.servers.



On Sun, Feb 1, 2015 at 5:30 AM, Jay Kreps jay.kr...@gmail.com wrote:

 You may already know this but the producer doesn't require a complete list
 of brokers in its config, it just requires the connection info for one
 active broker which it uses to discover the rest of the brokers. We allow
 you to specify multiple urls here for failover in cases where you aren't
 using a vip. So if you can put three brokers into the VIP for metadata
 bootstrapping you can still scale up and down the rest of the cluster.

 -Jay

 On Sun, Feb 1, 2015 at 12:17 AM, Alex The Rocker alex.m3...@gmail.com
 wrote:

  Jun:
 
  You raise a very good question: let me explain why we use
  Broker.getConnectionString(), so may be we'll get a supported way to
  answer our need.
 
  We use Broker.getConnectionString() because we deploy Kafka services
  in Amazon EC2 with the following architecture:
  * Three VMs dedicated to Zookeeper processes
  * At least two VMs with Kafka broker, but depending on load it can be
  scaled to more broker VMs. Brokers self-register their address in
  Zookeeper by serializing Broker objects in Zk.
 
  The VMs with Zookeeper have Elastic IPs = stable public IPs,
 
  These public IPs are fed to the  various Application services which
  rely on Kafka to stream their logs  monitoring data to our central
  Hadoop system.
 
  Using zkclient and the above mentionned public zookeeper IPs, we get
  the list of brokers registrered to a given Kafka service:  this is
  where we unserializer Broker objects and then use
  getConnectionString() to discover the brokers' addresses. Then,
  brokers addresses are used to initialize the Kafka producer(s).
 
  The whole trick is that we cannot use Elastic IP (=stable IPs) for
  Kafka VMs, because of their 'elastic nature : we want to be able to
  scale up / down the number of VMs with Kafka brokers.
 
  Now, we understand that using non public Kafka API is bad : we've been
  broken when moving to 0.8.1.1, then again when moving to 0.8.2.0...
 
  So it's time to raise the right question: what would be the supported
  way to configure our producers given our dynamic-IP-for-brokers
  context?
 
  Thanks,
  Alex.
 
  2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre
  alexandre.vermeerber...@3ds.com:
  
   -Original Message-
   From: Jun Rao [mailto:j...@confluent.io]
   Sent: Sunday, February 01, 2015 3:03
   To: us...@kafka.apache.org; kafka-clie...@googlegroups.com
   Cc: dev@kafka.apache.org
   Subject: Re: [VOTE] 0.8.2.0 Candidate 3
  
   Hi, Alex,
  
   Thanks for testing RC3.
  
   Broker.connectionString() is actually not part of the public api for
 the
  producer. Is there a particular reason that you need to use this api?
  
   Thanks,
  
   Jun
  
   On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker alex.m3...@gmail.com
 
   wrote:
  
   Hello,
  
   I have read Broker.scala source code, and I found the answer:
- With Kafka 0.8.1.1 we used Broker.getConnectionString() in our Java
   code.
- With Kafka 0.8.2.0, this method has been replaced by a 0-arity
   method without the get prefix, so we have to change our Java code to
   call
   Broker.connectionString()
  
   So despite binary compatibility is broken, we have a by-pass.
   I hope this will help other people relying on this API...
  
   and I'm going to continue tests with 0.8.2 rc3..
  
   Alex
  
   2015-01-31 21:23 GMT+01:00 Alex The Rocker alex.m3...@gmail.com:
  
Hello,
   
I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with
our
application:
   
1st test:
==
  replace all kafka .jar files in our application on consumming side
  (without recompiling anything)
  = tests passed, OK
   
2nd test:
===
  replace all kafka .jar files in our application on producubg side
  (without recompiling anything)
  = KO, we get this error:
   
2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
Exception in thread Timer-2
2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
java.lang.NoSuchMethodError:
kafka.cluster.Broker.getConnectionString()Ljava/lang/String;
   
Which means that binary compatibility with 0.8.1.1 version has been
   broken.
We use getConnectionString() to get Broker's zookeepers adresses,
see
   this
answer from Neha:
   
   
   
  
 http://mail-archives.apache.org/mod_mbox/kafka-users/201404.mbox/%3CCA
   

[DISCUSSION] generate explicit error/failing metrics

2015-01-27 Thread Steven Wu
To illustrate my point, I will use allTopicsOwnedPartitionsCount guage
from  ZookeeperConsumerConnector as an example. It captures number of
partitions for a topic that has been assigned owner for the consumer group.
let's say that I have a topic with 9 partitions. this metrics should
normally report value 9. I can setup alert
if allTopicsOwnedPartitionsCount 9.

here are the drawbacks of this kind of metric.
1) if our metrics report/aggregation system has data loss and cause the
value reported as zero, we can't really distinguish whether it's an real
error or it is data loss. so we can get false positive/alarm from data loss
2) if we change the number of partitions (e.g. from 9 to 18). we need to
remember to change the alert rule to allTopicsOwnedPartitionsCount 18.
this kind of coupling is a maintenance nightmare.

A more explicit metric is NoOwnerPartitionsCount. it should be zero
normally. if it is not zero, we should be alerted. this way, we won't get
false alarm from data loss.

We don't have to change/fix this particular example since a new consumer is
being worked on. But in new consumer please consider more explicit error
signals.

Thanks,
Steven


Re: Review Request 30158: Patch for KAFKA-1835

2015-01-23 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30158/#review69515
---



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/30158/#comment114212

I would
- for-loop to call Metadata.add(topic). this way we add all topics to 
Metadata
- call Metadata#requestUpdate() to tigger Sender thread to request update 
for all listed topics



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/30158/#comment114215

I would not use initialized flag, as long as we fix 
KafkaProducer#waitOnMetadata to allow value 0 for non-blocking.


- Steven Wu


On Jan. 22, 2015, 7:04 a.m., Paul Pearcy wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/30158/
 ---
 
 (Updated Jan. 22, 2015, 7:04 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1835
 https://issues.apache.org/jira/browse/KAFKA-1835
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1835 - New producer updates to make blocking behavior explicit
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 fc71710dd5997576d3841a1c3b0f7e19a8c9698e 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
   core/src/test/scala/integration/kafka/api/ProducerBlockingTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 ac15d34425795d5be20c51b01fa1108bdcd66583 
 
 Diff: https://reviews.apache.org/r/30158/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Paul Pearcy
 




two questions regarding 0.8.2 producer code

2014-11-10 Thread Steven Wu
I am checking out the source code of 0.8.2 producer code. I have two
questions and hope to get some clarifications.

1) Sender thread/Runnable has this run loop. what if the in-memory queue is
mostly empty (i.e. producer has very few msgs to send out)? will this
become a simple tight loop that just wastes cpu?

// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error(Uncaught error in kafka producer I/O thread: ,
e);
}
}

2) Selector#poll()  [line #200]

if (transmissions.hasSend())
throw new IllegalStateException(Attempt to begin a send
operation with prior send operation still in progress.);

2.1) since it's aysnc API, back-to-back sends to the same broker seem very
possible to me. should we throw an exception here?
2.2) if this happens, it seems that we will silently drop the msgs without
executing callbacks?


Thanks,
Steven


Re: two questions regarding 0.8.2 producer code

2014-11-10 Thread Steven Wu
Jay, thanks a lot for the quick response.

For #2, I do see some isSendable() check in Sender.java and
NetworkClient.java that is eventually mapped to
InFlightRequests#canSendMore() check. it wasn't immediately clear to me how
is that translated to transmissions.hasSend() check. anyway, I will dig a
little more.

On Mon, Nov 10, 2014 at 2:59 PM, Jay Kreps jay.kr...@gmail.com wrote:

 1. No, the send does a poll/select on all the connections that will block
 for a specified time waiting for data to read or write on any connection.
 2. The api of the selector only allows you to send a request to a ready
 connection. The definition of ready is that it doesn't have another request
 in the process of being written (it can have other requests outstanding
 that were previously written). So if you hit this case it is a programming
 error, and it should never happen in the producer. The write path for the
 data is it is written to the internal queue/buffer and the sender grabs
 data from that for ready connections and writes to them. This slightly
 complex ready/send api is required to allow back-pressure in the producer
 to work.

 -Jay

 On Mon, Nov 10, 2014 at 2:44 PM, Steven Wu stevenz...@gmail.com wrote:

  I am checking out the source code of 0.8.2 producer code. I have two
  questions and hope to get some clarifications.
 
  1) Sender thread/Runnable has this run loop. what if the in-memory queue
 is
  mostly empty (i.e. producer has very few msgs to send out)? will this
  become a simple tight loop that just wastes cpu?
 
  // main loop, runs until close is called
  while (running) {
  try {
  run(time.milliseconds());
  } catch (Exception e) {
  log.error(Uncaught error in kafka producer I/O thread:
 ,
  e);
  }
  }
 
  2) Selector#poll()  [line #200]
 
  if (transmissions.hasSend())
  throw new IllegalStateException(Attempt to begin a send
  operation with prior send operation still in progress.);
 
  2.1) since it's aysnc API, back-to-back sends to the same broker seem
 very
  possible to me. should we throw an exception here?
  2.2) if this happens, it seems that we will silently drop the msgs
 without
  executing callbacks?
 
 
  Thanks,
  Steven
 



Re: How producer gets the acknowledgement back

2014-09-24 Thread Steven Wu
It may be too late to change the Producer API now.

I always find ListenableFuture is very nice/usable. It essentially adds
callback to Future. It's a lot easier to chain/combine ListenableFuture.
http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/ListenableFuture.html

On Wed, Sep 24, 2014 at 6:05 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi,

 In the new (Java) producer, you can pass in a callback function in the

 FutureRecordMetadata send(ProducerRecord record, Callback callback)

 call, which will be triggered when the ack is received.

 Alternatively, you can also call Future.get() on the returned future
 metadata, which will block until the ack is received (i.e., synced
 sending).

 Guozhang

 On Wed, Sep 24, 2014 at 6:10 AM, Sreenivasulu Nallapati 
 sreenu.nallap...@gmail.com wrote:

  Hello,
 
  Can you please help me to get the acknowledgement  in producer?
 
 
  After setting the property  *request.required.acks to 1, *how producer
 gets
  the acknowledgement back? I am trying to get the acknowledgement in java
  producer.
 
 
 
 
 
  Thanks
 
  Sreeni
 



 --
 -- Guozhang