[jira] [Commented] (KAFKA-2334) Prevent HW from going back during leader failover

2015-11-17 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15009364#comment-15009364
 ] 

Albert Strasheim commented on KAFKA-2334:
-

Hitting this issue in production with 0.8.2.1 on a large cluster. Fix 
appreciated. :)

> Prevent HW from going back during leader failover 
> --
>
> Key: KAFKA-2334
> URL: https://issues.apache.org/jira/browse/KAFKA-2334
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Guozhang Wang
>Assignee: Neha Narkhede
> Fix For: 0.10.0.0
>
>
> Consider the following scenario:
> 0. Kafka use replication factor of 2, with broker B1 as the leader, and B2 as 
> the follower. 
> 1. A producer keep sending to Kafka with ack=-1.
> 2. A consumer repeat issuing ListOffset request to Kafka.
> And the following sequence:
> 0. B1 current log-end-offset (LEO) 0, HW-offset 0; and same with B2.
> 1. B1 receive a ProduceRequest of 100 messages, append to local log (LEO 
> becomes 100) and hold the request in purgatory.
> 2. B1 receive a FetchRequest starting at offset 0 from follower B2, and 
> returns the 100 messages.
> 3. B2 append its received message to local log (LEO becomes 100).
> 4. B1 receive another FetchRequest starting at offset 100 from B2, knowing 
> that B2's LEO has caught up to 100, and hence update its own HW, and 
> satisfying the ProduceRequest in purgatory, and sending the FetchResponse 
> with HW 100 back to B2 ASYNCHRONOUSLY.
> 5. B1 successfully sends the ProduceResponse to the producer, and then fails, 
> hence the FetchResponse did not reach B2, whose HW remains 0.
> From the consumer's point of view, it could first see the latest offset of 
> 100 (from B1), and then see the latest offset of 0 (from B2), and then the 
> latest offset gradually catch up to 100.
> This is because we use HW to guard the ListOffset and 
> Fetch-from-ordinary-consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1477) add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication

2014-08-11 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14093536#comment-14093536
 ] 

Albert Strasheim commented on KAFKA-1477:
-

CloudFlare is also very interested in this work. We are most interested in 
encryption and mutual authentication over TLS. I think it would be useful to be 
have a single broker support both an unencrypted and an encrypted port 
(producer on local network sends unencrypted, consumers on remote network 
consume encrypted).

We're mainly using this to move log events between data centers.

We've managed to kludge this together on top of an existing Kafka setup by 
using stunnel/haproxy 1.5 to do the encryption part along the lines described 
here:

https://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3CCABgmubZT3=n780lxmvounzz6shgvpd_t8bn-wfoge5kxysv...@mail.gmail.com%3E

but this will only work with a single broker and carefully matching the 
hostname/port in stunnel with the MetadataResponse, since setting 
advertised.host.name= and advertised.port= to the haproxy host and port causes 
the controller to try to use it that host/port for metadata, which doesn't work 
(as one would expect).

To be able to support multiple brokers (we only need maybe 1, 2 or 3 in this 
setup), we will probably tweak our consumer code to remap the advertised 
host/port and connect over TLS directly (without stunnel).

 add authentication layer and initial JKS x509 implementation for brokers, 
 producers and consumer for network communication
 --

 Key: KAFKA-1477
 URL: https://issues.apache.org/jira/browse/KAFKA-1477
 Project: Kafka
  Issue Type: New Feature
Reporter: Joe Stein
Assignee: Ivan Lyutov
 Fix For: 0.8.2

 Attachments: KAFKA-1477-binary.patch, KAFKA-1477.patch, 
 KAFKA-1477_2014-06-02_16:59:40.patch, KAFKA-1477_2014-06-02_17:24:26.patch, 
 KAFKA-1477_2014-06-03_13:46:17.patch






--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1509) Restart of destination broker after partition move leaves partitions without leader

2014-06-26 Thread Albert Strasheim (JIRA)
Albert Strasheim created KAFKA-1509:
---

 Summary: Restart of destination broker after partition move leaves 
partitions without leader
 Key: KAFKA-1509
 URL: https://issues.apache.org/jira/browse/KAFKA-1509
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Albert Strasheim


This should be reasonably easy to reproduce.

Make a Kafka cluster with a few machines.

Create a topic with partitions on these machines. No replication.

Bring up one more Kafka node.

Move some or all of the partitions onto this new broker:

kafka-reassign-partitions.sh --generate --zookeeper zk:2181 
--topics-to-move-json-file move.json --broker-list new broker

kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 
--reassignment-json-file reassign.json --execute

Wait until broker is the leader for all the partitions you moved.

Send some data to the partitions. It all works.

Shut down the broker that just received the data. Start it back up.
 
{code}
Topic:test  PartitionCount:2ReplicationFactor:1 Configs:
Topic: test Partition: 0Leader: -1  Replicas: 7 Isr: 
Topic: test Partition: 1Leader: -1  Replicas: 7 Isr: 
{code}

Leader for topic test never gets elected even though this node is the only node 
knows about the topic.

Some logs:
{code}
Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
(kafka.network.SocketServer)
Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
(kafka.network.SocketServer)
Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: 
Initialized controller epoch to 53 and zk version 52 
(kafka.controller.ControllerEpochListener)
Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not 
in the classpath (kafka.utils.Mx4jLoader$)
Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not 
in the classpath (kafka.utils.Mx4jLoader$)
Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up 
(kafka.controller.KafkaController)
Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
{version:1,brokerid:7,timestamp:1403824687354} stored data: 
{version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
{version:1,brokerid:7,timestamp:1403824687354} stored data: 
{version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup 
complete (kafka.controller.KafkaController)
Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
/brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
/brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
(kafka.server.KafkaServer)
Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
(kafka.server.KafkaServer)
Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
(LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3)
 for partition [requests,0] in response to UpdateMetadata request sent by 
controller 4 epoch 53 with correlation id 70 (state.change.logger)
Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1)
 for partition [requests,13] in response to UpdateMetadata request sent by 
controller 4 epoch 53 with correlation id 70 (state.change.logger)
Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5)
 for partition [requests_ipv6,5] in response to UpdateMetadata request sent by 
controller 4 epoch 53 with correlation id 70 (state.change.logger)
Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
(LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5)
 for partition [requests_stored,7] in response to UpdateMetadata request sent 
by controller 4 epoch 53 with correlation id 70 (state.change.logger)
Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:5,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:7)
 for partition [test,1] in response to UpdateMetadata request sent by 
controller 4 epoch 53 with correlation id 70 (state.change.logger)
Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
(LeaderAndIsrInfo:(Leader:5,ISR:5,LeaderEpoch:17,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:5)
 for partition [requests,6] in response to UpdateMetadata request sent by 
controller 4 

[jira] [Updated] (KAFKA-1509) Restart of destination broker after partition move leaves partitions without leader

2014-06-26 Thread Albert Strasheim (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Albert Strasheim updated KAFKA-1509:


Attachment: controller2.log

controller log. I tried a bunch of things (partition assignment, leader 
election) to make this fix itself, without luck.

If there is some way to drop the broken topics by editing ZooKeeper without 
taking down this cluster, I'd love to know about it. I don't mind losing the 
topics (they were for testing).

 Restart of destination broker after partition move leaves partitions without 
 leader
 ---

 Key: KAFKA-1509
 URL: https://issues.apache.org/jira/browse/KAFKA-1509
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Albert Strasheim
 Attachments: controller2.log


 This should be reasonably easy to reproduce.
 Make a Kafka cluster with a few machines.
 Create a topic with partitions on these machines. No replication.
 Bring up one more Kafka node.
 Move some or all of the partitions onto this new broker:
 kafka-reassign-partitions.sh --generate --zookeeper zk:2181 
 --topics-to-move-json-file move.json --broker-list new broker
 kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 
 --reassignment-json-file reassign.json --execute
 Wait until broker is the leader for all the partitions you moved.
 Send some data to the partitions. It all works.
 Shut down the broker that just received the data. Start it back up.
  
 {code}
 Topic:testPartitionCount:2ReplicationFactor:1 Configs:
   Topic: test Partition: 0Leader: -1  Replicas: 7 Isr: 
   Topic: test Partition: 1Leader: -1  Replicas: 7 Isr: 
 {code}
 Leader for topic test never gets elected even though this node is the only 
 node knows about the topic.
 Some logs:
 {code}
 Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
 (kafka.network.SocketServer)
 Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
 (kafka.network.SocketServer)
 Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: 
 Initialized controller epoch to 53 and zk version 52 
 (kafka.controller.ControllerEpochListener)
 Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is 
 not in the classpath (kafka.utils.Mx4jLoader$)
 Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is 
 not in the classpath (kafka.utils.Mx4jLoader$)
 Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up 
 (kafka.controller.KafkaController)
 Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
 {version:1,brokerid:7,timestamp:1403824687354} stored data: 
 {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
 {version:1,brokerid:7,timestamp:1403824687354} stored data: 
 {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup 
 complete (kafka.controller.KafkaController)
 Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
 /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
 /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
 (kafka.server.KafkaServer)
 Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
 (kafka.server.KafkaServer)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3)
  for partition [requests,0] in response to UpdateMetadata request sent by 
 controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1)
  for partition [requests,13] in response to UpdateMetadata request sent by 
 controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5)
  for partition [requests_ipv6,5] in response to UpdateMetadata request sent 
 by controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5)
  for partition [requests_stored,7] in response to UpdateMetadata request sent 
 by controller 4 

[jira] [Commented] (KAFKA-1509) Restart of destination broker after partition move leaves partitions without leader

2014-06-26 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045497#comment-14045497
 ] 

Albert Strasheim commented on KAFKA-1509:
-

Workaround: patch up all the /brokers/topic/partitions/part/state entries in 
ZK with the leader and ISR values you want and then apply the same config with 
/usr/local/kafka/bin/kafka-reassign-partitions.sh.

 Restart of destination broker after partition move leaves partitions without 
 leader
 ---

 Key: KAFKA-1509
 URL: https://issues.apache.org/jira/browse/KAFKA-1509
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Albert Strasheim
 Attachments: controller2.log


 This should be reasonably easy to reproduce.
 Make a Kafka cluster with a few machines.
 Create a topic with partitions on these machines. No replication.
 Bring up one more Kafka node.
 Move some or all of the partitions onto this new broker:
 kafka-reassign-partitions.sh --generate --zookeeper zk:2181 
 --topics-to-move-json-file move.json --broker-list new broker
 kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 
 --reassignment-json-file reassign.json --execute
 Wait until broker is the leader for all the partitions you moved.
 Send some data to the partitions. It all works.
 Shut down the broker that just received the data. Start it back up.
  
 {code}
 Topic:testPartitionCount:2ReplicationFactor:1 Configs:
   Topic: test Partition: 0Leader: -1  Replicas: 7 Isr: 
   Topic: test Partition: 1Leader: -1  Replicas: 7 Isr: 
 {code}
 Leader for topic test never gets elected even though this node is the only 
 node knows about the topic.
 Some logs:
 {code}
 Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
 (kafka.network.SocketServer)
 Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
 (kafka.network.SocketServer)
 Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: 
 Initialized controller epoch to 53 and zk version 52 
 (kafka.controller.ControllerEpochListener)
 Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is 
 not in the classpath (kafka.utils.Mx4jLoader$)
 Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is 
 not in the classpath (kafka.utils.Mx4jLoader$)
 Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up 
 (kafka.controller.KafkaController)
 Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
 {version:1,brokerid:7,timestamp:1403824687354} stored data: 
 {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
 {version:1,brokerid:7,timestamp:1403824687354} stored data: 
 {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup 
 complete (kafka.controller.KafkaController)
 Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
 /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
 /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
 (kafka.server.KafkaServer)
 Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
 (kafka.server.KafkaServer)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3)
  for partition [requests,0] in response to UpdateMetadata request sent by 
 controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1)
  for partition [requests,13] in response to UpdateMetadata request sent by 
 controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5)
  for partition [requests_ipv6,5] in response to UpdateMetadata request sent 
 by controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5)
  for partition [requests_stored,7] in response to UpdateMetadata request sent 
 by controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost 

[jira] [Comment Edited] (KAFKA-1509) Restart of destination broker after partition move leaves partitions without leader

2014-06-26 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045497#comment-14045497
 ] 

Albert Strasheim edited comment on KAFKA-1509 at 6/27/14 4:04 AM:
--

Workaround: patch up all the /brokers/topic/partitions/part/state entries in 
ZK with the leader and ISR values you want and then apply the same config with 
/usr/local/kafka/bin/kafka-preferred-replica-election.sh.


was (Author: fullung):
Workaround: patch up all the /brokers/topic/partitions/part/state entries in 
ZK with the leader and ISR values you want and then apply the same config with 
/usr/local/kafka/bin/kafka-reassign-partitions.sh.

 Restart of destination broker after partition move leaves partitions without 
 leader
 ---

 Key: KAFKA-1509
 URL: https://issues.apache.org/jira/browse/KAFKA-1509
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Albert Strasheim
 Attachments: controller2.log


 This should be reasonably easy to reproduce.
 Make a Kafka cluster with a few machines.
 Create a topic with partitions on these machines. No replication.
 Bring up one more Kafka node.
 Move some or all of the partitions onto this new broker:
 kafka-reassign-partitions.sh --generate --zookeeper zk:2181 
 --topics-to-move-json-file move.json --broker-list new broker
 kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 
 --reassignment-json-file reassign.json --execute
 Wait until broker is the leader for all the partitions you moved.
 Send some data to the partitions. It all works.
 Shut down the broker that just received the data. Start it back up.
  
 {code}
 Topic:testPartitionCount:2ReplicationFactor:1 Configs:
   Topic: test Partition: 0Leader: -1  Replicas: 7 Isr: 
   Topic: test Partition: 1Leader: -1  Replicas: 7 Isr: 
 {code}
 Leader for topic test never gets elected even though this node is the only 
 node knows about the topic.
 Some logs:
 {code}
 Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
 (kafka.network.SocketServer)
 Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
 (kafka.network.SocketServer)
 Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: 
 Initialized controller epoch to 53 and zk version 52 
 (kafka.controller.ControllerEpochListener)
 Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is 
 not in the classpath (kafka.utils.Mx4jLoader$)
 Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is 
 not in the classpath (kafka.utils.Mx4jLoader$)
 Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up 
 (kafka.controller.KafkaController)
 Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
 {version:1,brokerid:7,timestamp:1403824687354} stored data: 
 {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
 {version:1,brokerid:7,timestamp:1403824687354} stored data: 
 {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup 
 complete (kafka.controller.KafkaController)
 Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
 /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
 /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
 (kafka.server.KafkaServer)
 Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
 (kafka.server.KafkaServer)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3)
  for partition [requests,0] in response to UpdateMetadata request sent by 
 controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1)
  for partition [requests,13] in response to UpdateMetadata request sent by 
 controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5)
  for partition [requests_ipv6,5] in response to UpdateMetadata request sent 
 by controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader 

[jira] [Updated] (KAFKA-1509) Restart of destination broker after unreplicated partition move leaves partitions without leader

2014-06-26 Thread Albert Strasheim (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Albert Strasheim updated KAFKA-1509:


Summary: Restart of destination broker after unreplicated partition move 
leaves partitions without leader  (was: Restart of destination broker after 
partition move leaves partitions without leader)

 Restart of destination broker after unreplicated partition move leaves 
 partitions without leader
 

 Key: KAFKA-1509
 URL: https://issues.apache.org/jira/browse/KAFKA-1509
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Albert Strasheim
 Attachments: controller2.log


 This should be reasonably easy to reproduce.
 Make a Kafka cluster with a few machines.
 Create a topic with partitions on these machines. No replication.
 Bring up one more Kafka node.
 Move some or all of the partitions onto this new broker:
 kafka-reassign-partitions.sh --generate --zookeeper zk:2181 
 --topics-to-move-json-file move.json --broker-list new broker
 kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 
 --reassignment-json-file reassign.json --execute
 Wait until broker is the leader for all the partitions you moved.
 Send some data to the partitions. It all works.
 Shut down the broker that just received the data. Start it back up.
  
 {code}
 Topic:testPartitionCount:2ReplicationFactor:1 Configs:
   Topic: test Partition: 0Leader: -1  Replicas: 7 Isr: 
   Topic: test Partition: 1Leader: -1  Replicas: 7 Isr: 
 {code}
 Leader for topic test never gets elected even though this node is the only 
 node knows about the topic.
 Some logs:
 {code}
 Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
 (kafka.network.SocketServer)
 Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started 
 (kafka.network.SocketServer)
 Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: 
 Initialized controller epoch to 53 and zk version 52 
 (kafka.controller.ControllerEpochListener)
 Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is 
 not in the classpath (kafka.utils.Mx4jLoader$)
 Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is 
 not in the classpath (kafka.utils.Mx4jLoader$)
 Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up 
 (kafka.controller.KafkaController)
 Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
 {version:1,brokerid:7,timestamp:1403824687354} stored data: 
 {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: 
 {version:1,brokerid:7,timestamp:1403824687354} stored data: 
 {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup 
 complete (kafka.controller.KafkaController)
 Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
 /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path 
 /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$)
 Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
 (kafka.server.KafkaServer)
 Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started 
 (kafka.server.KafkaServer)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3)
  for partition [requests,0] in response to UpdateMetadata request sent by 
 controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1)
  for partition [requests,13] in response to UpdateMetadata request sent by 
 controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5)
  for partition [requests_ipv6,5] in response to UpdateMetadata request sent 
 by controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info 
 (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5)
  for partition [requests_stored,7] in response to UpdateMetadata request sent 
 by controller 4 epoch 53 with correlation id 70 (state.change.logger)
 Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader 

[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-06-13 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14030908#comment-14030908
 ] 

Albert Strasheim commented on KAFKA-1493:
-

What does the format look like for a Snappy compressed message?

One might simply need a varint-encoded field for the uncompressed length 
followed by a compressed block.

The LZ4 streaming format and the xxhash, etc. in there might be overkill.

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Reporter: James Oliver
 Fix For: 0.8.2






--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-06-13 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14030908#comment-14030908
 ] 

Albert Strasheim edited comment on KAFKA-1493 at 6/13/14 5:59 PM:
--

What does the compression format look like for a Snappy compressed message?

One might simply need a varint-encoded field for the uncompressed length 
followed by a compressed block.

The LZ4 streaming format and the xxhash, etc. in there might be overkill.


was (Author: fullung):
What does the format look like for a Snappy compressed message?

One might simply need a varint-encoded field for the uncompressed length 
followed by a compressed block.

The LZ4 streaming format and the xxhash, etc. in there might be overkill.

 Use a well-documented LZ4 compression format and remove redundant LZ4HC option
 --

 Key: KAFKA-1493
 URL: https://issues.apache.org/jira/browse/KAFKA-1493
 Project: Kafka
  Issue Type: Improvement
Reporter: James Oliver
 Fix For: 0.8.2






--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1456) Add LZ4 and LZ4C as a compression codec

2014-06-12 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14029266#comment-14029266
 ] 

Albert Strasheim commented on KAFKA-1456:
-

Hi all. Did Stephan's concerns get addressed? Pretty please (with sugar on top) 
don't leave the LZ4 compression format as it stands in 
github.com/jpountz/lz4-java right now.

 Add LZ4 and LZ4C as a compression codec
 ---

 Key: KAFKA-1456
 URL: https://issues.apache.org/jira/browse/KAFKA-1456
 Project: Kafka
  Issue Type: Improvement
Reporter: Joe Stein
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1456.patch, KAFKA-1456_2014-05-19_15:01:10.patch, 
 KAFKA-1456_2014-05-19_16:39:01.patch, KAFKA-1456_2014-05-19_18:19:32.patch, 
 KAFKA-1456_2014-05-19_23:24:27.patch






--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-16 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13999157#comment-13999157
 ] 

Albert Strasheim commented on KAFKA-1449:
-

Maybe MagicByte also needs to incremented before introducing the Attributes 
change, but this might needlessly break more consumers than needed.

 Extend wire protocol to allow CRC32C
 

 Key: KAFKA-1449
 URL: https://issues.apache.org/jira/browse/KAFKA-1449
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Albert Strasheim
Assignee: Neha Narkhede
 Fix For: 0.9.0


 Howdy
 We are currently building out a number of Kafka consumers in Go, based on a 
 patched version of the Sarama library that Shopify released a while back.
 We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
 and lots of cores. We have various consumers computing all kinds of 
 aggregates on a reasonably high volume access log stream (1.1e6 messages/sec 
 peak, about 500-600 bytes per message uncompressed).
 When profiling our consumer, our single hottest function (until we disabled 
 it), was the CRC32 checksum validation, since the deserialization and 
 aggregation in these consumers is pretty cheap.
 We believe things could be improved by extending the wire protocol to support 
 CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
 calculation.
 https://en.wikipedia.org/wiki/SSE4#SSE4.2
 It might be hard to use from Java, but consumers written in most other 
 languages will benefit a lot.
 To give you an idea, here are some benchmarks for the Go CRC32 functions 
 running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:
 BenchmarkCrc32KB   90196 ns/op 363.30 MB/s
 BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s
 I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
 CRC32-C speed should be close to what one achieves in Go.
 (Met Todd and Clark at the meetup last night. Thanks for the great 
 presentation!)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-15 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13998466#comment-13998466
 ] 

Albert Strasheim edited comment on KAFKA-1449 at 5/15/14 5:49 AM:
--

Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it 
could mean that the Crc field is a CRC32C instead of a CRC32.

New producers producing messages for new consumers could choose to set the 
third bit and use CRC32C instead of CRC32. New consumers will check for the bit 
being set and verify the checksum as CRC32C instead of CRC32.

A new producer for a stream with old consumers should produce CRC32 messages. 
If it doesn't, old consumers not checking for the 3rd bit being set will verify 
the CRC32C checksum as a CRC32, which should fail, which seems like a good 
enough outcome. Old consumers can't read data from new producers that choose to 
use the new checksum.

Old producers continue to use CRC32 and new consumers will continue to verify 
with CRC32, which will be slow. To fix this, upgrade your producer code.

If Kafka does any internal verification, it will have to be updated to check 
for the 3rd bit being set and verify the checksum accordingly.

Did I miss anything?

P.S. This is probably a separate JIRA, but for compression, it would be nice to 
support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but 
much better decompression speeds.

Benchmarks here: https://code.google.com/p/lz4/

Keeping this in mind, maybe the 3rd bit should also be reserved for evolving 
the available compression algorithms and the 4th bit can be for CRC32/CRC32C?


was (Author: fullung):
Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it 
could mean that the Crc field is a CRC32C instead of a CRC32.

New CRC32C producers producing messages for new consumers could choose to set 
the third bit and use CRC32C instead of CRC32. New consumers will check for the 
bit being set and verify the checksum as CRC32C instead of CRC32.

A new producer for a stream with old consumers should produce CRC32 messages. 
If it doesn't, old consumers not checking for the 3rd bit being set will verify 
the CRC32C checksum as a CRC32, which should fail, which seems like a good 
enough outcome. Old consumers can't read data from new producers that choose to 
use the new checksum.

Old producers continue to use CRC32 and new consumers will continue to verify 
with CRC32, which will be slow. To fix this, upgrade your producer code.

If Kafka does any internal verification, it will have to be updated to check 
for the 3rd bit being set and verify the checksum accordingly.

Did I miss anything?

P.S. This is probably a separate JIRA, but for compression, it would be nice to 
support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but 
much better decompression speeds.

Benchmarks here: https://code.google.com/p/lz4/

Keeping this in mind, maybe the 3rd bit should also be reserved for evolving 
the available compression algorithms and the 4th bit can be for CRC32/CRC32C?

 Extend wire protocol to allow CRC32C
 

 Key: KAFKA-1449
 URL: https://issues.apache.org/jira/browse/KAFKA-1449
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Albert Strasheim
Assignee: Neha Narkhede
 Fix For: 0.9.0


 Howdy
 We are currently building out a number of Kafka consumers in Go, based on a 
 patched version of the Sarama library that Shopify released a while back.
 We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
 and lots of cores. We have various consumers computing all kinds of 
 aggregates on a reasonably high volume access log stream (1.1e6 messages/sec 
 peak, about 500-600 bytes per message uncompressed).
 When profiling our consumer, our single hottest function (until we disabled 
 it), was the CRC32 checksum validation, since the deserialization and 
 aggregation in these consumers is pretty cheap.
 We believe things could be improved by extending the wire protocol to support 
 CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
 calculation.
 https://en.wikipedia.org/wiki/SSE4#SSE4.2
 It might be hard to use from Java, but consumers written in most other 
 languages will benefit a lot.
 To give you an idea, here are some benchmarks for the Go CRC32 functions 
 running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:
 BenchmarkCrc32KB   90196 ns/op 363.30 MB/s
 BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s
 I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
 CRC32-C speed should be close to what one achieves in Go.
 (Met Todd and Clark at the meetup last night. Thanks for the great 
 presentation!)



--
This message 

[jira] [Commented] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-15 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13998466#comment-13998466
 ] 

Albert Strasheim commented on KAFKA-1449:
-

Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it 
could mean that the Crc field is a CRC32C instead of a CRC32.

New CRC32C producers producing messages for new consumers could choose to set 
the third bit and use CRC32C instead of CRC32. New consumers will check for the 
bit being set and verify the checksum as CRC32C instead of CRC32.

A new producer for a stream with old consumers should produce CRC32 messages. 
If it doesn't, old consumers not checking for the 3rd bit being set will verify 
the CRC32C checksum as a CRC32, which should fail, which seems like a good 
enough outcome. Old consumers can't read data from new producers that choose to 
use the new checksum.

Old producers continue to use CRC32 and new consumers will continue to verify 
with CRC32, which will be slow. To fix this, upgrade your producer code.

If Kafka does any internal verification, it will have to be updated to check 
for the 3rd bit being set and verify the checksum accordingly.

Did I miss anything?

P.S. This is probably a separate JIRA, but for compression, it would be nice to 
support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but 
much better decompression speeds.

Benchmarks here: https://code.google.com/p/lz4/

Keeping this in mind, maybe the 3rd bit should also be reserved for evolving 
the available compression algorithms and the 4th bit can be for CRC32/CRC32C?

 Extend wire protocol to allow CRC32C
 

 Key: KAFKA-1449
 URL: https://issues.apache.org/jira/browse/KAFKA-1449
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Albert Strasheim
Assignee: Neha Narkhede
 Fix For: 0.9.0


 Howdy
 We are currently building out a number of Kafka consumers in Go, based on a 
 patched version of the Sarama library that Shopify released a while back.
 We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
 and lots of cores. We have various consumers computing all kinds of 
 aggregates on a reasonably high volume access log stream (1.1e6 messages/sec 
 peak, about 500-600 bytes per message uncompressed).
 When profiling our consumer, our single hottest function (until we disabled 
 it), was the CRC32 checksum validation, since the deserialization and 
 aggregation in these consumers is pretty cheap.
 We believe things could be improved by extending the wire protocol to support 
 CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
 calculation.
 https://en.wikipedia.org/wiki/SSE4#SSE4.2
 It might be hard to use from Java, but consumers written in most other 
 languages will benefit a lot.
 To give you an idea, here are some benchmarks for the Go CRC32 functions 
 running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:
 BenchmarkCrc32KB   90196 ns/op 363.30 MB/s
 BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s
 I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
 CRC32-C speed should be close to what one achieves in Go.
 (Met Todd and Clark at the meetup last night. Thanks for the great 
 presentation!)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-15 Thread Albert Strasheim (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13998466#comment-13998466
 ] 

Albert Strasheim edited comment on KAFKA-1449 at 5/15/14 5:50 AM:
--

Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it 
could mean that the Crc field is a CRC32C instead of a CRC32.

New producers producing messages for new consumers could choose to set the 
third bit and use CRC32C instead of CRC32. New consumers will check for the bit 
being set and verify the checksum as CRC32C instead of CRC32.

A new producer for a stream with old consumers should produce CRC32 messages. 
If it doesn't, old consumers not checking for the 3rd bit being set will verify 
the CRC32C checksum as a CRC32, which should fail, which seems like a good 
enough outcome. Old consumers can't read data from new producers that choose to 
use the new checksum.

Old producers continue to use CRC32 and new consumers will continue to verify 
with CRC32 since the 3rd bit should be zero according to the protocol spec, 
which will be slow. To go fast, upgrade your producer code first and start 
using CRC32C there.

If Kafka does any internal verification, it will have to be updated to check 
for the 3rd bit being set and verify the checksum accordingly.

Did I miss anything?

P.S. This is probably a separate JIRA, but for compression, it would be nice to 
support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but 
much better decompression speeds.

Benchmarks here: https://code.google.com/p/lz4/

Keeping this in mind, maybe the 3rd bit should also be reserved for evolving 
the available compression algorithms and the 4th bit can be for CRC32/CRC32C?


was (Author: fullung):
Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it 
could mean that the Crc field is a CRC32C instead of a CRC32.

New producers producing messages for new consumers could choose to set the 
third bit and use CRC32C instead of CRC32. New consumers will check for the bit 
being set and verify the checksum as CRC32C instead of CRC32.

A new producer for a stream with old consumers should produce CRC32 messages. 
If it doesn't, old consumers not checking for the 3rd bit being set will verify 
the CRC32C checksum as a CRC32, which should fail, which seems like a good 
enough outcome. Old consumers can't read data from new producers that choose to 
use the new checksum.

Old producers continue to use CRC32 and new consumers will continue to verify 
with CRC32, which will be slow. To fix this, upgrade your producer code.

If Kafka does any internal verification, it will have to be updated to check 
for the 3rd bit being set and verify the checksum accordingly.

Did I miss anything?

P.S. This is probably a separate JIRA, but for compression, it would be nice to 
support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but 
much better decompression speeds.

Benchmarks here: https://code.google.com/p/lz4/

Keeping this in mind, maybe the 3rd bit should also be reserved for evolving 
the available compression algorithms and the 4th bit can be for CRC32/CRC32C?

 Extend wire protocol to allow CRC32C
 

 Key: KAFKA-1449
 URL: https://issues.apache.org/jira/browse/KAFKA-1449
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Albert Strasheim
Assignee: Neha Narkhede
 Fix For: 0.9.0


 Howdy
 We are currently building out a number of Kafka consumers in Go, based on a 
 patched version of the Sarama library that Shopify released a while back.
 We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
 and lots of cores. We have various consumers computing all kinds of 
 aggregates on a reasonably high volume access log stream (1.1e6 messages/sec 
 peak, about 500-600 bytes per message uncompressed).
 When profiling our consumer, our single hottest function (until we disabled 
 it), was the CRC32 checksum validation, since the deserialization and 
 aggregation in these consumers is pretty cheap.
 We believe things could be improved by extending the wire protocol to support 
 CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
 calculation.
 https://en.wikipedia.org/wiki/SSE4#SSE4.2
 It might be hard to use from Java, but consumers written in most other 
 languages will benefit a lot.
 To give you an idea, here are some benchmarks for the Go CRC32 functions 
 running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:
 BenchmarkCrc32KB   90196 ns/op 363.30 MB/s
 BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s
 I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
 CRC32-C speed should be close to what one achieves in Go.
 (Met Todd 

[jira] [Updated] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-13 Thread Albert Strasheim (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Albert Strasheim updated KAFKA-1449:


Description: 
Howdy

We are currently building out a number of Kafka consumers in Go, based on a 
patched version of the Sarama library that Shopify released a while back.

We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
and lots of cores. We have various consumers computing all kinds of aggregates 
on a reasonably high volume access log stream (1.1e6 messages/sec peak, about 
500-600 bytes per message uncompressed).

When profiling our consumer, our single hottest function (until we disabled 
it), was the CRC32 checksum validation, since the deserialization and 
aggregation in these consumers is pretty cheap.

We believe things could be improved by extending the wire protocol to support 
CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
calculation.

https://en.wikipedia.org/wiki/SSE4#SSE4.2

It might be hard to use from Java, but consumers written in most other 
languages will benefit a lot.

To give you an idea, here are some benchmarks for the Go CRC32 functions 
running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:

BenchmarkCrc32KB 90196 ns/op 363.30 MB/s
BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s

I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
CRC32-C speed should be close to what one achieves in Go.

(Met Todd and Clark at the meetup last night. Thanks for the great 
presentation!)

  was:
Howdy

We are currently building out a number of Kafka consumers in Go, based on a 
patched version of the Sarama library that Shopify released a while back.

We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
and lots of cores. We have various consumers computing all kinds of aggregates 
on a reasonably high volume access log stream (1e6 messages/sec peak, about 
500-600 bytes per message uncompressed).

When profiling our consumer, our single hottest function (until we disabled 
it), was the CRC32 checksum validation, since the deserialization and 
aggregation in these consumers is pretty cheap.

We believe things could be improved by extending the wire protocol to support 
CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
calculation.

https://en.wikipedia.org/wiki/SSE4#SSE4.2

It might be hard to use from Java, but consumers written in most other 
languages will benefit a lot.

To give you an idea, here are some benchmarks for the Go CRC32 functions 
running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:

BenchmarkCrc32KB 90196 ns/op 363.30 MB/s
BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s

I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
CRC32-C speed should be close to what one achieves in Go.

(Met Todd and Clark at the meetup last night. Thanks for the great 
presentation!)


 Extend wire protocol to allow CRC32C
 

 Key: KAFKA-1449
 URL: https://issues.apache.org/jira/browse/KAFKA-1449
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Albert Strasheim
Assignee: Neha Narkhede
 Fix For: 0.9.0


 Howdy
 We are currently building out a number of Kafka consumers in Go, based on a 
 patched version of the Sarama library that Shopify released a while back.
 We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
 and lots of cores. We have various consumers computing all kinds of 
 aggregates on a reasonably high volume access log stream (1.1e6 messages/sec 
 peak, about 500-600 bytes per message uncompressed).
 When profiling our consumer, our single hottest function (until we disabled 
 it), was the CRC32 checksum validation, since the deserialization and 
 aggregation in these consumers is pretty cheap.
 We believe things could be improved by extending the wire protocol to support 
 CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
 calculation.
 https://en.wikipedia.org/wiki/SSE4#SSE4.2
 It might be hard to use from Java, but consumers written in most other 
 languages will benefit a lot.
 To give you an idea, here are some benchmarks for the Go CRC32 functions 
 running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:
 BenchmarkCrc32KB   90196 ns/op 363.30 MB/s
 BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s
 I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
 CRC32-C speed should be close to what one achieves in Go.
 (Met Todd and Clark at the meetup last night. Thanks for the great 
 presentation!)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-13 Thread Albert Strasheim (JIRA)
Albert Strasheim created KAFKA-1449:
---

 Summary: Extend wire protocol to allow CRC32C
 Key: KAFKA-1449
 URL: https://issues.apache.org/jira/browse/KAFKA-1449
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Albert Strasheim
Assignee: Neha Narkhede
 Fix For: 0.9.0


Howdy

We are currently building out a number of Kafka consumers in Go, based on a 
patched version of the Sarama library that Shopify released a while back.

We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
and lots of cores. We have various consumers computing all kinds of aggregates 
on a reasonably high volume access log stream (1e6 messages/sec peak, about 
500-600 bytes per message uncompressed).

When profiling our consumer, our single hottest function (until we disabled 
it), was the CRC32 checksum validation, since the deserialization and 
aggregation in these consumers is pretty cheap.

We believe things could be improved by extending the wire protocol to support 
CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
calculation.

https://en.wikipedia.org/wiki/SSE4#SSE4.2

It might be hard to use from Java, but consumers written in most other 
languages will benefit a lot.

To give you an idea, here are some benchmarks for the Go CRC32 functions 
running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:

BenchmarkCrc32KB 90196 ns/op 363.30 MB/s
BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s

I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
CRC32-C speed should be close to what one achieves in Go.

(Met Todd and Clark at the meetup last night. Thanks for the great 
presentation!)



--
This message was sent by Atlassian JIRA
(v6.2#6252)