[jira] [Commented] (KAFKA-2334) Prevent HW from going back during leader failover
[ https://issues.apache.org/jira/browse/KAFKA-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15009364#comment-15009364 ] Albert Strasheim commented on KAFKA-2334: - Hitting this issue in production with 0.8.2.1 on a large cluster. Fix appreciated. :) > Prevent HW from going back during leader failover > -- > > Key: KAFKA-2334 > URL: https://issues.apache.org/jira/browse/KAFKA-2334 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.8.2.1 >Reporter: Guozhang Wang >Assignee: Neha Narkhede > Fix For: 0.10.0.0 > > > Consider the following scenario: > 0. Kafka use replication factor of 2, with broker B1 as the leader, and B2 as > the follower. > 1. A producer keep sending to Kafka with ack=-1. > 2. A consumer repeat issuing ListOffset request to Kafka. > And the following sequence: > 0. B1 current log-end-offset (LEO) 0, HW-offset 0; and same with B2. > 1. B1 receive a ProduceRequest of 100 messages, append to local log (LEO > becomes 100) and hold the request in purgatory. > 2. B1 receive a FetchRequest starting at offset 0 from follower B2, and > returns the 100 messages. > 3. B2 append its received message to local log (LEO becomes 100). > 4. B1 receive another FetchRequest starting at offset 100 from B2, knowing > that B2's LEO has caught up to 100, and hence update its own HW, and > satisfying the ProduceRequest in purgatory, and sending the FetchResponse > with HW 100 back to B2 ASYNCHRONOUSLY. > 5. B1 successfully sends the ProduceResponse to the producer, and then fails, > hence the FetchResponse did not reach B2, whose HW remains 0. > From the consumer's point of view, it could first see the latest offset of > 100 (from B1), and then see the latest offset of 0 (from B2), and then the > latest offset gradually catch up to 100. > This is because we use HW to guard the ListOffset and > Fetch-from-ordinary-consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1477) add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication
[ https://issues.apache.org/jira/browse/KAFKA-1477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14093536#comment-14093536 ] Albert Strasheim commented on KAFKA-1477: - CloudFlare is also very interested in this work. We are most interested in encryption and mutual authentication over TLS. I think it would be useful to be have a single broker support both an unencrypted and an encrypted port (producer on local network sends unencrypted, consumers on remote network consume encrypted). We're mainly using this to move log events between data centers. We've managed to kludge this together on top of an existing Kafka setup by using stunnel/haproxy 1.5 to do the encryption part along the lines described here: https://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3CCABgmubZT3=n780lxmvounzz6shgvpd_t8bn-wfoge5kxysv...@mail.gmail.com%3E but this will only work with a single broker and carefully matching the hostname/port in stunnel with the MetadataResponse, since setting advertised.host.name= and advertised.port= to the haproxy host and port causes the controller to try to use it that host/port for metadata, which doesn't work (as one would expect). To be able to support multiple brokers (we only need maybe 1, 2 or 3 in this setup), we will probably tweak our consumer code to remap the advertised host/port and connect over TLS directly (without stunnel). add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication -- Key: KAFKA-1477 URL: https://issues.apache.org/jira/browse/KAFKA-1477 Project: Kafka Issue Type: New Feature Reporter: Joe Stein Assignee: Ivan Lyutov Fix For: 0.8.2 Attachments: KAFKA-1477-binary.patch, KAFKA-1477.patch, KAFKA-1477_2014-06-02_16:59:40.patch, KAFKA-1477_2014-06-02_17:24:26.patch, KAFKA-1477_2014-06-03_13:46:17.patch -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1509) Restart of destination broker after partition move leaves partitions without leader
Albert Strasheim created KAFKA-1509: --- Summary: Restart of destination broker after partition move leaves partitions without leader Key: KAFKA-1509 URL: https://issues.apache.org/jira/browse/KAFKA-1509 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Albert Strasheim This should be reasonably easy to reproduce. Make a Kafka cluster with a few machines. Create a topic with partitions on these machines. No replication. Bring up one more Kafka node. Move some or all of the partitions onto this new broker: kafka-reassign-partitions.sh --generate --zookeeper zk:2181 --topics-to-move-json-file move.json --broker-list new broker kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 --reassignment-json-file reassign.json --execute Wait until broker is the leader for all the partitions you moved. Send some data to the partitions. It all works. Shut down the broker that just received the data. Start it back up. {code} Topic:test PartitionCount:2ReplicationFactor:1 Configs: Topic: test Partition: 0Leader: -1 Replicas: 7 Isr: Topic: test Partition: 1Leader: -1 Replicas: 7 Isr: {code} Leader for topic test never gets elected even though this node is the only node knows about the topic. Some logs: {code} Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: Initialized controller epoch to 53 and zk version 52 (kafka.controller.ControllerEpochListener) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup complete (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3) for partition [requests,0] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1) for partition [requests,13] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5) for partition [requests_ipv6,5] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5) for partition [requests_stored,7] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:5,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:7) for partition [test,1] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:5,ISR:5,LeaderEpoch:17,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:5) for partition [requests,6] in response to UpdateMetadata request sent by controller 4
[jira] [Updated] (KAFKA-1509) Restart of destination broker after partition move leaves partitions without leader
[ https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Strasheim updated KAFKA-1509: Attachment: controller2.log controller log. I tried a bunch of things (partition assignment, leader election) to make this fix itself, without luck. If there is some way to drop the broken topics by editing ZooKeeper without taking down this cluster, I'd love to know about it. I don't mind losing the topics (they were for testing). Restart of destination broker after partition move leaves partitions without leader --- Key: KAFKA-1509 URL: https://issues.apache.org/jira/browse/KAFKA-1509 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Albert Strasheim Attachments: controller2.log This should be reasonably easy to reproduce. Make a Kafka cluster with a few machines. Create a topic with partitions on these machines. No replication. Bring up one more Kafka node. Move some or all of the partitions onto this new broker: kafka-reassign-partitions.sh --generate --zookeeper zk:2181 --topics-to-move-json-file move.json --broker-list new broker kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 --reassignment-json-file reassign.json --execute Wait until broker is the leader for all the partitions you moved. Send some data to the partitions. It all works. Shut down the broker that just received the data. Start it back up. {code} Topic:testPartitionCount:2ReplicationFactor:1 Configs: Topic: test Partition: 0Leader: -1 Replicas: 7 Isr: Topic: test Partition: 1Leader: -1 Replicas: 7 Isr: {code} Leader for topic test never gets elected even though this node is the only node knows about the topic. Some logs: {code} Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: Initialized controller epoch to 53 and zk version 52 (kafka.controller.ControllerEpochListener) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup complete (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3) for partition [requests,0] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1) for partition [requests,13] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5) for partition [requests_ipv6,5] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5) for partition [requests_stored,7] in response to UpdateMetadata request sent by controller 4
[jira] [Commented] (KAFKA-1509) Restart of destination broker after partition move leaves partitions without leader
[ https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045497#comment-14045497 ] Albert Strasheim commented on KAFKA-1509: - Workaround: patch up all the /brokers/topic/partitions/part/state entries in ZK with the leader and ISR values you want and then apply the same config with /usr/local/kafka/bin/kafka-reassign-partitions.sh. Restart of destination broker after partition move leaves partitions without leader --- Key: KAFKA-1509 URL: https://issues.apache.org/jira/browse/KAFKA-1509 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Albert Strasheim Attachments: controller2.log This should be reasonably easy to reproduce. Make a Kafka cluster with a few machines. Create a topic with partitions on these machines. No replication. Bring up one more Kafka node. Move some or all of the partitions onto this new broker: kafka-reassign-partitions.sh --generate --zookeeper zk:2181 --topics-to-move-json-file move.json --broker-list new broker kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 --reassignment-json-file reassign.json --execute Wait until broker is the leader for all the partitions you moved. Send some data to the partitions. It all works. Shut down the broker that just received the data. Start it back up. {code} Topic:testPartitionCount:2ReplicationFactor:1 Configs: Topic: test Partition: 0Leader: -1 Replicas: 7 Isr: Topic: test Partition: 1Leader: -1 Replicas: 7 Isr: {code} Leader for topic test never gets elected even though this node is the only node knows about the topic. Some logs: {code} Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: Initialized controller epoch to 53 and zk version 52 (kafka.controller.ControllerEpochListener) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup complete (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3) for partition [requests,0] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1) for partition [requests,13] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5) for partition [requests_ipv6,5] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5) for partition [requests_stored,7] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost
[jira] [Comment Edited] (KAFKA-1509) Restart of destination broker after partition move leaves partitions without leader
[ https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045497#comment-14045497 ] Albert Strasheim edited comment on KAFKA-1509 at 6/27/14 4:04 AM: -- Workaround: patch up all the /brokers/topic/partitions/part/state entries in ZK with the leader and ISR values you want and then apply the same config with /usr/local/kafka/bin/kafka-preferred-replica-election.sh. was (Author: fullung): Workaround: patch up all the /brokers/topic/partitions/part/state entries in ZK with the leader and ISR values you want and then apply the same config with /usr/local/kafka/bin/kafka-reassign-partitions.sh. Restart of destination broker after partition move leaves partitions without leader --- Key: KAFKA-1509 URL: https://issues.apache.org/jira/browse/KAFKA-1509 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Albert Strasheim Attachments: controller2.log This should be reasonably easy to reproduce. Make a Kafka cluster with a few machines. Create a topic with partitions on these machines. No replication. Bring up one more Kafka node. Move some or all of the partitions onto this new broker: kafka-reassign-partitions.sh --generate --zookeeper zk:2181 --topics-to-move-json-file move.json --broker-list new broker kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 --reassignment-json-file reassign.json --execute Wait until broker is the leader for all the partitions you moved. Send some data to the partitions. It all works. Shut down the broker that just received the data. Start it back up. {code} Topic:testPartitionCount:2ReplicationFactor:1 Configs: Topic: test Partition: 0Leader: -1 Replicas: 7 Isr: Topic: test Partition: 1Leader: -1 Replicas: 7 Isr: {code} Leader for topic test never gets elected even though this node is the only node knows about the topic. Some logs: {code} Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: Initialized controller epoch to 53 and zk version 52 (kafka.controller.ControllerEpochListener) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup complete (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3) for partition [requests,0] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1) for partition [requests,13] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5) for partition [requests_ipv6,5] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader
[jira] [Updated] (KAFKA-1509) Restart of destination broker after unreplicated partition move leaves partitions without leader
[ https://issues.apache.org/jira/browse/KAFKA-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Strasheim updated KAFKA-1509: Summary: Restart of destination broker after unreplicated partition move leaves partitions without leader (was: Restart of destination broker after partition move leaves partitions without leader) Restart of destination broker after unreplicated partition move leaves partitions without leader Key: KAFKA-1509 URL: https://issues.apache.org/jira/browse/KAFKA-1509 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Albert Strasheim Attachments: controller2.log This should be reasonably easy to reproduce. Make a Kafka cluster with a few machines. Create a topic with partitions on these machines. No replication. Bring up one more Kafka node. Move some or all of the partitions onto this new broker: kafka-reassign-partitions.sh --generate --zookeeper zk:2181 --topics-to-move-json-file move.json --broker-list new broker kafka-reassign-partitions.sh --zookeeper 36cfqd1.in.cfops.it:2181 --reassignment-json-file reassign.json --execute Wait until broker is the leader for all the partitions you moved. Send some data to the partitions. It all works. Shut down the broker that just received the data. Start it back up. {code} Topic:testPartitionCount:2ReplicationFactor:1 Configs: Topic: test Partition: 0Leader: -1 Replicas: 7 Isr: Topic: test Partition: 1Leader: -1 Replicas: 7 Isr: {code} Leader for topic test never gets elected even though this node is the only node knows about the topic. Some logs: {code} Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [Socket Server on Broker 7], Started (kafka.network.SocketServer) Jun 26 23:18:07 localhost kafka: INFO [ControllerEpochListener on 7]: Initialized controller epoch to 53 and zk version 52 (kafka.controller.ControllerEpochListener) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller starting up (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO conflict in /controller data: {version:1,brokerid:7,timestamp:1403824687354} stored data: {version:1,brokerid:4,timestamp:1403297911725} (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Controller 7]: Controller startup complete (kafka.controller.KafkaController) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO Registered broker 7 at path /brokers/ids/7 with address xxx:9092. (kafka.utils.ZkUtils$) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: INFO [Kafka Server 7], started (kafka.server.KafkaServer) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:14,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:3) for partition [requests,0] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:11,ControllerEpoch:53),ReplicationFactor:1),AllReplicas:1) for partition [requests,13] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:4,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:1,5) for partition [requests_ipv6,5] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader info (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:13,ControllerEpoch:53),ReplicationFactor:2),AllReplicas:4,5) for partition [requests_stored,7] in response to UpdateMetadata request sent by controller 4 epoch 53 with correlation id 70 (state.change.logger) Jun 26 23:18:07 localhost kafka: TRACE Broker 7 cached leader
[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14030908#comment-14030908 ] Albert Strasheim commented on KAFKA-1493: - What does the format look like for a Snappy compressed message? One might simply need a varint-encoded field for the uncompressed length followed by a compressed block. The LZ4 streaming format and the xxhash, etc. in there might be overkill. Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Reporter: James Oliver Fix For: 0.8.2 -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14030908#comment-14030908 ] Albert Strasheim edited comment on KAFKA-1493 at 6/13/14 5:59 PM: -- What does the compression format look like for a Snappy compressed message? One might simply need a varint-encoded field for the uncompressed length followed by a compressed block. The LZ4 streaming format and the xxhash, etc. in there might be overkill. was (Author: fullung): What does the format look like for a Snappy compressed message? One might simply need a varint-encoded field for the uncompressed length followed by a compressed block. The LZ4 streaming format and the xxhash, etc. in there might be overkill. Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Reporter: James Oliver Fix For: 0.8.2 -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1456) Add LZ4 and LZ4C as a compression codec
[ https://issues.apache.org/jira/browse/KAFKA-1456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14029266#comment-14029266 ] Albert Strasheim commented on KAFKA-1456: - Hi all. Did Stephan's concerns get addressed? Pretty please (with sugar on top) don't leave the LZ4 compression format as it stands in github.com/jpountz/lz4-java right now. Add LZ4 and LZ4C as a compression codec --- Key: KAFKA-1456 URL: https://issues.apache.org/jira/browse/KAFKA-1456 Project: Kafka Issue Type: Improvement Reporter: Joe Stein Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1456.patch, KAFKA-1456_2014-05-19_15:01:10.patch, KAFKA-1456_2014-05-19_16:39:01.patch, KAFKA-1456_2014-05-19_18:19:32.patch, KAFKA-1456_2014-05-19_23:24:27.patch -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1449) Extend wire protocol to allow CRC32C
[ https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13999157#comment-13999157 ] Albert Strasheim commented on KAFKA-1449: - Maybe MagicByte also needs to incremented before introducing the Attributes change, but this might needlessly break more consumers than needed. Extend wire protocol to allow CRC32C Key: KAFKA-1449 URL: https://issues.apache.org/jira/browse/KAFKA-1449 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Albert Strasheim Assignee: Neha Narkhede Fix For: 0.9.0 Howdy We are currently building out a number of Kafka consumers in Go, based on a patched version of the Sarama library that Shopify released a while back. We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network and lots of cores. We have various consumers computing all kinds of aggregates on a reasonably high volume access log stream (1.1e6 messages/sec peak, about 500-600 bytes per message uncompressed). When profiling our consumer, our single hottest function (until we disabled it), was the CRC32 checksum validation, since the deserialization and aggregation in these consumers is pretty cheap. We believe things could be improved by extending the wire protocol to support CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its calculation. https://en.wikipedia.org/wiki/SSE4#SSE4.2 It might be hard to use from Java, but consumers written in most other languages will benefit a lot. To give you an idea, here are some benchmarks for the Go CRC32 functions running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core: BenchmarkCrc32KB 90196 ns/op 363.30 MB/s BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the CRC32-C speed should be close to what one achieves in Go. (Met Todd and Clark at the meetup last night. Thanks for the great presentation!) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1449) Extend wire protocol to allow CRC32C
[ https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13998466#comment-13998466 ] Albert Strasheim edited comment on KAFKA-1449 at 5/15/14 5:49 AM: -- Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it could mean that the Crc field is a CRC32C instead of a CRC32. New producers producing messages for new consumers could choose to set the third bit and use CRC32C instead of CRC32. New consumers will check for the bit being set and verify the checksum as CRC32C instead of CRC32. A new producer for a stream with old consumers should produce CRC32 messages. If it doesn't, old consumers not checking for the 3rd bit being set will verify the CRC32C checksum as a CRC32, which should fail, which seems like a good enough outcome. Old consumers can't read data from new producers that choose to use the new checksum. Old producers continue to use CRC32 and new consumers will continue to verify with CRC32, which will be slow. To fix this, upgrade your producer code. If Kafka does any internal verification, it will have to be updated to check for the 3rd bit being set and verify the checksum accordingly. Did I miss anything? P.S. This is probably a separate JIRA, but for compression, it would be nice to support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but much better decompression speeds. Benchmarks here: https://code.google.com/p/lz4/ Keeping this in mind, maybe the 3rd bit should also be reserved for evolving the available compression algorithms and the 4th bit can be for CRC32/CRC32C? was (Author: fullung): Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it could mean that the Crc field is a CRC32C instead of a CRC32. New CRC32C producers producing messages for new consumers could choose to set the third bit and use CRC32C instead of CRC32. New consumers will check for the bit being set and verify the checksum as CRC32C instead of CRC32. A new producer for a stream with old consumers should produce CRC32 messages. If it doesn't, old consumers not checking for the 3rd bit being set will verify the CRC32C checksum as a CRC32, which should fail, which seems like a good enough outcome. Old consumers can't read data from new producers that choose to use the new checksum. Old producers continue to use CRC32 and new consumers will continue to verify with CRC32, which will be slow. To fix this, upgrade your producer code. If Kafka does any internal verification, it will have to be updated to check for the 3rd bit being set and verify the checksum accordingly. Did I miss anything? P.S. This is probably a separate JIRA, but for compression, it would be nice to support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but much better decompression speeds. Benchmarks here: https://code.google.com/p/lz4/ Keeping this in mind, maybe the 3rd bit should also be reserved for evolving the available compression algorithms and the 4th bit can be for CRC32/CRC32C? Extend wire protocol to allow CRC32C Key: KAFKA-1449 URL: https://issues.apache.org/jira/browse/KAFKA-1449 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Albert Strasheim Assignee: Neha Narkhede Fix For: 0.9.0 Howdy We are currently building out a number of Kafka consumers in Go, based on a patched version of the Sarama library that Shopify released a while back. We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network and lots of cores. We have various consumers computing all kinds of aggregates on a reasonably high volume access log stream (1.1e6 messages/sec peak, about 500-600 bytes per message uncompressed). When profiling our consumer, our single hottest function (until we disabled it), was the CRC32 checksum validation, since the deserialization and aggregation in these consumers is pretty cheap. We believe things could be improved by extending the wire protocol to support CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its calculation. https://en.wikipedia.org/wiki/SSE4#SSE4.2 It might be hard to use from Java, but consumers written in most other languages will benefit a lot. To give you an idea, here are some benchmarks for the Go CRC32 functions running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core: BenchmarkCrc32KB 90196 ns/op 363.30 MB/s BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the CRC32-C speed should be close to what one achieves in Go. (Met Todd and Clark at the meetup last night. Thanks for the great presentation!) -- This message
[jira] [Commented] (KAFKA-1449) Extend wire protocol to allow CRC32C
[ https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13998466#comment-13998466 ] Albert Strasheim commented on KAFKA-1449: - Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it could mean that the Crc field is a CRC32C instead of a CRC32. New CRC32C producers producing messages for new consumers could choose to set the third bit and use CRC32C instead of CRC32. New consumers will check for the bit being set and verify the checksum as CRC32C instead of CRC32. A new producer for a stream with old consumers should produce CRC32 messages. If it doesn't, old consumers not checking for the 3rd bit being set will verify the CRC32C checksum as a CRC32, which should fail, which seems like a good enough outcome. Old consumers can't read data from new producers that choose to use the new checksum. Old producers continue to use CRC32 and new consumers will continue to verify with CRC32, which will be slow. To fix this, upgrade your producer code. If Kafka does any internal verification, it will have to be updated to check for the 3rd bit being set and verify the checksum accordingly. Did I miss anything? P.S. This is probably a separate JIRA, but for compression, it would be nice to support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but much better decompression speeds. Benchmarks here: https://code.google.com/p/lz4/ Keeping this in mind, maybe the 3rd bit should also be reserved for evolving the available compression algorithms and the 4th bit can be for CRC32/CRC32C? Extend wire protocol to allow CRC32C Key: KAFKA-1449 URL: https://issues.apache.org/jira/browse/KAFKA-1449 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Albert Strasheim Assignee: Neha Narkhede Fix For: 0.9.0 Howdy We are currently building out a number of Kafka consumers in Go, based on a patched version of the Sarama library that Shopify released a while back. We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network and lots of cores. We have various consumers computing all kinds of aggregates on a reasonably high volume access log stream (1.1e6 messages/sec peak, about 500-600 bytes per message uncompressed). When profiling our consumer, our single hottest function (until we disabled it), was the CRC32 checksum validation, since the deserialization and aggregation in these consumers is pretty cheap. We believe things could be improved by extending the wire protocol to support CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its calculation. https://en.wikipedia.org/wiki/SSE4#SSE4.2 It might be hard to use from Java, but consumers written in most other languages will benefit a lot. To give you an idea, here are some benchmarks for the Go CRC32 functions running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core: BenchmarkCrc32KB 90196 ns/op 363.30 MB/s BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the CRC32-C speed should be close to what one achieves in Go. (Met Todd and Clark at the meetup last night. Thanks for the great presentation!) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1449) Extend wire protocol to allow CRC32C
[ https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13998466#comment-13998466 ] Albert Strasheim edited comment on KAFKA-1449 at 5/15/14 5:50 AM: -- Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it could mean that the Crc field is a CRC32C instead of a CRC32. New producers producing messages for new consumers could choose to set the third bit and use CRC32C instead of CRC32. New consumers will check for the bit being set and verify the checksum as CRC32C instead of CRC32. A new producer for a stream with old consumers should produce CRC32 messages. If it doesn't, old consumers not checking for the 3rd bit being set will verify the CRC32C checksum as a CRC32, which should fail, which seems like a good enough outcome. Old consumers can't read data from new producers that choose to use the new checksum. Old producers continue to use CRC32 and new consumers will continue to verify with CRC32 since the 3rd bit should be zero according to the protocol spec, which will be slow. To go fast, upgrade your producer code first and start using CRC32C there. If Kafka does any internal verification, it will have to be updated to check for the 3rd bit being set and verify the checksum accordingly. Did I miss anything? P.S. This is probably a separate JIRA, but for compression, it would be nice to support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but much better decompression speeds. Benchmarks here: https://code.google.com/p/lz4/ Keeping this in mind, maybe the 3rd bit should also be reserved for evolving the available compression algorithms and the 4th bit can be for CRC32/CRC32C? was (Author: fullung): Attributes in Message has 6 bits to play with. Maybe if the 3rd bit is set, it could mean that the Crc field is a CRC32C instead of a CRC32. New producers producing messages for new consumers could choose to set the third bit and use CRC32C instead of CRC32. New consumers will check for the bit being set and verify the checksum as CRC32C instead of CRC32. A new producer for a stream with old consumers should produce CRC32 messages. If it doesn't, old consumers not checking for the 3rd bit being set will verify the CRC32C checksum as a CRC32, which should fail, which seems like a good enough outcome. Old consumers can't read data from new producers that choose to use the new checksum. Old producers continue to use CRC32 and new consumers will continue to verify with CRC32, which will be slow. To fix this, upgrade your producer code. If Kafka does any internal verification, it will have to be updated to check for the 3rd bit being set and verify the checksum accordingly. Did I miss anything? P.S. This is probably a separate JIRA, but for compression, it would be nice to support LZ4 too. LZ4 and LZ4HC have compression speeds similar to Snappy, but much better decompression speeds. Benchmarks here: https://code.google.com/p/lz4/ Keeping this in mind, maybe the 3rd bit should also be reserved for evolving the available compression algorithms and the 4th bit can be for CRC32/CRC32C? Extend wire protocol to allow CRC32C Key: KAFKA-1449 URL: https://issues.apache.org/jira/browse/KAFKA-1449 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Albert Strasheim Assignee: Neha Narkhede Fix For: 0.9.0 Howdy We are currently building out a number of Kafka consumers in Go, based on a patched version of the Sarama library that Shopify released a while back. We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network and lots of cores. We have various consumers computing all kinds of aggregates on a reasonably high volume access log stream (1.1e6 messages/sec peak, about 500-600 bytes per message uncompressed). When profiling our consumer, our single hottest function (until we disabled it), was the CRC32 checksum validation, since the deserialization and aggregation in these consumers is pretty cheap. We believe things could be improved by extending the wire protocol to support CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its calculation. https://en.wikipedia.org/wiki/SSE4#SSE4.2 It might be hard to use from Java, but consumers written in most other languages will benefit a lot. To give you an idea, here are some benchmarks for the Go CRC32 functions running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core: BenchmarkCrc32KB 90196 ns/op 363.30 MB/s BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the CRC32-C speed should be close to what one achieves in Go. (Met Todd
[jira] [Updated] (KAFKA-1449) Extend wire protocol to allow CRC32C
[ https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Strasheim updated KAFKA-1449: Description: Howdy We are currently building out a number of Kafka consumers in Go, based on a patched version of the Sarama library that Shopify released a while back. We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network and lots of cores. We have various consumers computing all kinds of aggregates on a reasonably high volume access log stream (1.1e6 messages/sec peak, about 500-600 bytes per message uncompressed). When profiling our consumer, our single hottest function (until we disabled it), was the CRC32 checksum validation, since the deserialization and aggregation in these consumers is pretty cheap. We believe things could be improved by extending the wire protocol to support CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its calculation. https://en.wikipedia.org/wiki/SSE4#SSE4.2 It might be hard to use from Java, but consumers written in most other languages will benefit a lot. To give you an idea, here are some benchmarks for the Go CRC32 functions running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core: BenchmarkCrc32KB 90196 ns/op 363.30 MB/s BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the CRC32-C speed should be close to what one achieves in Go. (Met Todd and Clark at the meetup last night. Thanks for the great presentation!) was: Howdy We are currently building out a number of Kafka consumers in Go, based on a patched version of the Sarama library that Shopify released a while back. We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network and lots of cores. We have various consumers computing all kinds of aggregates on a reasonably high volume access log stream (1e6 messages/sec peak, about 500-600 bytes per message uncompressed). When profiling our consumer, our single hottest function (until we disabled it), was the CRC32 checksum validation, since the deserialization and aggregation in these consumers is pretty cheap. We believe things could be improved by extending the wire protocol to support CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its calculation. https://en.wikipedia.org/wiki/SSE4#SSE4.2 It might be hard to use from Java, but consumers written in most other languages will benefit a lot. To give you an idea, here are some benchmarks for the Go CRC32 functions running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core: BenchmarkCrc32KB 90196 ns/op 363.30 MB/s BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the CRC32-C speed should be close to what one achieves in Go. (Met Todd and Clark at the meetup last night. Thanks for the great presentation!) Extend wire protocol to allow CRC32C Key: KAFKA-1449 URL: https://issues.apache.org/jira/browse/KAFKA-1449 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Albert Strasheim Assignee: Neha Narkhede Fix For: 0.9.0 Howdy We are currently building out a number of Kafka consumers in Go, based on a patched version of the Sarama library that Shopify released a while back. We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network and lots of cores. We have various consumers computing all kinds of aggregates on a reasonably high volume access log stream (1.1e6 messages/sec peak, about 500-600 bytes per message uncompressed). When profiling our consumer, our single hottest function (until we disabled it), was the CRC32 checksum validation, since the deserialization and aggregation in these consumers is pretty cheap. We believe things could be improved by extending the wire protocol to support CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its calculation. https://en.wikipedia.org/wiki/SSE4#SSE4.2 It might be hard to use from Java, but consumers written in most other languages will benefit a lot. To give you an idea, here are some benchmarks for the Go CRC32 functions running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core: BenchmarkCrc32KB 90196 ns/op 363.30 MB/s BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the CRC32-C speed should be close to what one achieves in Go. (Met Todd and Clark at the meetup last night. Thanks for the great presentation!) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1449) Extend wire protocol to allow CRC32C
Albert Strasheim created KAFKA-1449: --- Summary: Extend wire protocol to allow CRC32C Key: KAFKA-1449 URL: https://issues.apache.org/jira/browse/KAFKA-1449 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Albert Strasheim Assignee: Neha Narkhede Fix For: 0.9.0 Howdy We are currently building out a number of Kafka consumers in Go, based on a patched version of the Sarama library that Shopify released a while back. We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network and lots of cores. We have various consumers computing all kinds of aggregates on a reasonably high volume access log stream (1e6 messages/sec peak, about 500-600 bytes per message uncompressed). When profiling our consumer, our single hottest function (until we disabled it), was the CRC32 checksum validation, since the deserialization and aggregation in these consumers is pretty cheap. We believe things could be improved by extending the wire protocol to support CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its calculation. https://en.wikipedia.org/wiki/SSE4#SSE4.2 It might be hard to use from Java, but consumers written in most other languages will benefit a lot. To give you an idea, here are some benchmarks for the Go CRC32 functions running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core: BenchmarkCrc32KB 90196 ns/op 363.30 MB/s BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the CRC32-C speed should be close to what one achieves in Go. (Met Todd and Clark at the meetup last night. Thanks for the great presentation!) -- This message was sent by Atlassian JIRA (v6.2#6252)