I found one big contributor to the badness was my custom partitioner had a bug (missing a Utils.ToPositive call). I also found the default partitioner use of murmur is very bad, compare to simply doing a hash, for a 5X perf degradation! As you can see bellow, using a good custom partitioner , I can achieve 1875 message/s, while the default partitioner only gets me to 381 message/s.
Just to share my finding, here are my notes: public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if ((keyBytes == null) || (!(key instanceof Long))) throw new InvalidRecordException("We expect all message to have a long as key"); return Utils.toPositive(Utils.murmur2(keyBytes))%cluster.partitionCountForTopic(topic); } 381 message/s 2.6mb/s public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if ((keyBytes == null) || (!(key instanceof Long))) throw new InvalidRecordException("We expect all message to have a long as key"); long k = (long) key / 1000; //consecutive 1000 id are co located return Utils.murmur2(toBytes(k))%cluster.partitionCountForTopic(topic); } private static byte[] toBytes(long val) { byte [] b = new byte[8]; for (int i = 7; i > 0; i--) { b[i] = (byte) val; val >>>= 8; } b[0] = (byte) val; return b; } 205 message/s 1.4mb/s including some error public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if ((keyBytes == null) || (!(key instanceof Long))) throw new InvalidRecordException("We expect all message to have a long as key"); long k = (long) key / 1000; //consecutive 1000 id are co located return Utils.toPositive(Utils.murmur2(toBytes(k)))%cluster.partitionCountForTopic(topic); } 381 message/s 2.6mb/s public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if ((keyBytes == null) || (!(key instanceof Long))) throw new InvalidRecordException("We expect all message to have a long as key"); long k = (long) key / 1000; //consecutive 1000 id are co located return Long.hashCode(k)%cluster.partitionCountForTopic(topic); } 1865 message/s 3.7mb/s public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if ((keyBytes == null) || (!(key instanceof Long))) throw new InvalidRecordException("We expect all message to have a long as key"); long k = (long) key / 1000; //consecutive 1000 id are co located return (int)(k%cluster.partitionCountForTopic(topic)); 1875 message/s 3.7mb/s -----Original Message----- From: Eric Owhadi <eric.owh...@esgyn.com> Sent: Thursday, October 3, 2019 4:34 PM To: users@kafka.apache.org Subject: RE: poor producing performance with very low CPU utilization? External To test if my backend was good, I tried using the kafka-producer-perf-test. Boy that was fast! Instead of my lame 200 message per seconds, I am getting 20 000 message per seconds. 100X.... That is more in line with my expectations. Granted that this test does not use my custom partitioner and serializer. I will try add variables one after the other, but definitelly the bottleneck is not the server :-). kafka-producer-perf-test --num-records 6000000 --record-size 12016 --topic DEFAULT-.TIMESERIES.SmartpumpCollectorVector --throughput 1000000 --print-metrics --producer-props bootstrap.servers=nap052.esgyn.local:9092,localhost:9092 compression.type=snappy batch.size=65536 acks=all linger.ms=85 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.21/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/ESGYNDB-2.7.0-A1/traf_home/export/lib/orc-tools-1.5.0-uber.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 19/10/03 14:19:33 INFO producer.ProducerConfig: ProducerConfig values: acks = all batch.size = 65536 bootstrap.servers = [xxx.esgyn.local:9092, localhost:9092] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = snappy connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 85 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 19/10/03 14:19:33 INFO utils.AppInfoParser: Kafka version: 2.2.1-kafka-4.1.0 19/10/03 14:19:33 INFO utils.AppInfoParser: Kafka commitId: unknown 19/10/03 14:19:33 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 19/10/03 14:19:33 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. 19/10/03 14:19:33 INFO clients.Metadata: Cluster ID: t4ZcV5AwRq6AK0scvEyf6w 128382 records sent, 25630.3 records/sec (293.71 MB/sec), 106.6 ms avg latency, 630.0 ms max latency. 137267 records sent, 27447.9 records/sec (314.54 MB/sec), 103.1 ms avg latency, 618.0 ms max latency. 154956 records sent, 30960.2 records/sec (354.78 MB/sec), 108.1 ms avg latency, 523.0 ms max latency. 160987 records sent, 32165.2 records/sec (368.59 MB/sec), 109.2 ms avg latency, 583.0 ms max latency. 156126 records sent, 31194.0 records/sec (357.46 MB/sec), 110.2 ms avg latency, 890.0 ms max latency. 143638 records sent, 28727.6 records/sec (329.20 MB/sec), 100.5 ms avg latency, 517.0 ms max latency. 81710 records sent, 13403.9 records/sec (153.60 MB/sec), 144.7 ms avg latency, 2654.0 ms max latency. 5414 records sent, 1036.6 records/sec (11.88 MB/sec), 2016.0 ms avg latency, 7246.0 ms max latency. 82215 records sent, 16423.3 records/sec (188.20 MB/sec), 307.1 ms avg latency, 8694.0 ms max latency. 88897 records sent, 17768.7 records/sec (203.62 MB/sec), 167.4 ms avg latency, 1725.0 ms max latency. 151365 records sent, 30248.8 records/sec (346.63 MB/sec), 108.6 ms avg latency, 493.0 ms max latency. 156538 records sent, 31301.3 records/sec (358.69 MB/sec), 109.5 ms avg latency, 523.0 ms max latency. 130701 records sent, 26114.1 records/sec (299.25 MB/sec), 113.8 ms avg latency, 674.0 ms max latency. 67604 records sent, 13496.5 records/sec (154.66 MB/sec), 197.0 ms avg latency, 2677.0 ms max latency. 65918 records sent, 13181.0 records/sec (151.05 MB/sec), 184.6 ms avg latency, 2619.0 ms max latency. 101673 records sent, 20314.3 records/sec (232.79 MB/sec), 132.8 ms avg latency, 1593.0 ms max latency. 120567 records sent, 24089.3 records/sec (276.05 MB/sec), 125.1 ms avg latency, 847.0 ms max latency. 153552 records sent, 30685.9 records/sec (351.64 MB/sec), 111.2 ms avg latency, 501.0 ms max latency. 148710 records sent, 29730.1 records/sec (340.69 MB/sec), 109.8 ms avg latency, 596.0 ms max latency. 90535 records sent, 17945.5 records/sec (205.64 MB/sec), 140.3 ms avg latency, 1202.0 ms max latency. 43865 records sent, 8641.6 records/sec (99.03 MB/sec), 216.2 ms avg latency, 2082.0 ms max latency. 56323 records sent, 11257.8 records/sec (129.01 MB/sec), 389.2 ms avg latency, 3426.0 ms max latency. 136051 records sent, 27204.8 records/sec (311.75 MB/sec), 110.0 ms avg latency, 648.0 ms max latency. 85620 records sent, 17120.6 records/sec (196.19 MB/sec), 189.7 ms avg latency, 2418.0 ms max latency. 141306 records sent, 28261.2 records/sec (323.85 MB/sec), 101.9 ms avg latency, 536.0 ms max latency. 140048 records sent, 28009.6 records/sec (320.97 MB/sec), 108.6 ms avg latency, 566.0 ms max latency. 109297 records sent, 21841.9 records/sec (250.29 MB/sec), 154.9 ms avg latency, 1034.0 ms max latency. 60731 records sent, 11715.1 records/sec (134.25 MB/sec), 179.8 ms avg latency, 2625.0 ms max latency. 24637 records sent, 4919.5 records/sec (56.37 MB/sec), 810.5 ms avg latency, 3651.0 ms max latency. 140499 records sent, 28071.7 records/sec (321.68 MB/sec), 107.1 ms avg latency, 593.0 ms max latency. 82061 records sent, 16408.9 records/sec (188.04 MB/sec), 197.2 ms avg latency, 1949.0 ms max latency. 151387 records sent, 30204.9 records/sec (346.13 MB/sec), 101.1 ms avg latency, 582.0 ms max latency. 126133 records sent, 25181.3 records/sec (288.56 MB/sec), 121.5 ms avg latency, 749.0 ms max latency. 98287 records sent, 19637.8 records/sec (225.04 MB/sec), 148.9 ms avg latency, 1070.0 ms max latency. 57611 records sent, 11515.3 records/sec (131.96 MB/sec), 228.0 ms avg latency, 1301.0 ms max latency. 54744 records sent, 10940.0 records/sec (125.37 MB/sec), 219.9 ms avg latency, 3090.0 ms max latency. 127183 records sent, 25411.2 records/sec (291.20 MB/sec), 130.2 ms avg latency, 1317.0 ms max latency. 114095 records sent, 22782.5 records/sec (261.07 MB/sec), 128.0 ms avg latency, 827.0 ms max latency. 135414 records sent, 27066.6 records/sec (310.17 MB/sec), 114.4 ms avg latency, 963.0 ms max latency. 89328 records sent, 17865.6 records/sec (204.73 MB/sec), 155.0 ms avg latency, 1446.0 ms max latency. 117350 records sent, 23441.9 records/sec (268.63 MB/sec), 140.3 ms avg latency, 1357.0 ms max latency. 59659 records sent, 11927.0 records/sec (136.68 MB/sec), 287.4 ms avg latency, 2629.0 ms max latency. 121268 records sent, 24239.1 records/sec (277.76 MB/sec), 148.9 ms avg latency, 2729.0 ms max latency. 125586 records sent, 25117.2 records/sec (287.83 MB/sec), 125.8 ms avg latency, 823.0 ms max latency. 135528 records sent, 27078.5 records/sec (310.30 MB/sec), 112.3 ms avg latency, 792.0 ms max latency. 147679 records sent, 29535.8 records/sec (338.46 MB/sec), 105.7 ms avg latency, 524.0 ms max latency. 104386 records sent, 20868.9 records/sec (239.14 MB/sec), 148.7 ms avg latency, 1920.0 ms max latency. 103882 records sent, 20776.4 records/sec (238.08 MB/sec), 157.5 ms avg latency, 2064.0 ms max latency. 42445 records sent, 8451.8 records/sec (96.85 MB/sec), 252.5 ms avg latency, 2314.0 ms max latency. 93666 records sent, 18722.0 records/sec (214.54 MB/sec), 226.7 ms avg latency, 3462.0 ms max latency. 101172 records sent, 20190.0 records/sec (231.36 MB/sec), 140.0 ms avg latency, 1601.0 ms max latency. 91260 records sent, 18241.1 records/sec (209.03 MB/sec), 135.5 ms avg latency, 1447.0 ms max latency. 157822 records sent, 31545.5 records/sec (361.49 MB/sec), 110.2 ms avg latency, 488.0 ms max latency. 107820 records sent, 21525.3 records/sec (246.67 MB/sec), 131.8 ms avg latency, 859.0 ms max latency. 65499 records sent, 12302.6 records/sec (140.98 MB/sec), 180.2 ms avg latency, 2417.0 ms max latency. 87593 records sent, 17501.1 records/sec (200.55 MB/sec), 229.3 ms avg latency, 1865.0 ms max latency. 6000000 records sent, 21145.896110 records/sec (242.32 MB/sec), 144.03 ms avg latency, 8694.00 ms max latency, 91 ms 50th, 359 ms 95th, 1160 ms 99th, 3182 ms 99.9th. Metric Name Value app-info:commit-id:{client-id=producer-1} : unknown app-info:version:{client-id=producer-1} : 2.2.1-kafka-4.1.0 kafka-metrics-count:count:{client-id=producer-1} : 125.000 producer-metrics:batch-size-avg:{client-id=producer-1} : 27928.990 producer-metrics:batch-size-max:{client-id=producer-1} : 53510.000 producer-metrics:batch-split-rate:{client-id=producer-1} : 0.000 producer-metrics:batch-split-total:{client-id=producer-1} : 0.000 producer-metrics:buffer-available-bytes:{client-id=producer-1} : 33554432.000 producer-metrics:buffer-exhausted-rate:{client-id=producer-1} : 0.000 producer-metrics:buffer-exhausted-total:{client-id=producer-1} : 0.000 producer-metrics:buffer-total-bytes:{client-id=producer-1} : 33554432.000 producer-metrics:bufferpool-wait-ratio:{client-id=producer-1} : 0.627 producer-metrics:bufferpool-wait-time-total:{client-id=producer-1} : 163044386821.000 producer-metrics:compression-rate-avg:{client-id=producer-1} : 0.605 producer-metrics:connection-close-rate:{client-id=producer-1} : 0.000 producer-metrics:connection-close-total:{client-id=producer-1} : 2.000 producer-metrics:connection-count:{client-id=producer-1} : 3.000 producer-metrics:connection-creation-rate:{client-id=producer-1} : 0.000 producer-metrics:connection-creation-total:{client-id=producer-1} : 3.000 producer-metrics:failed-authentication-rate:{client-id=producer-1} : 0.000 producer-metrics:failed-authentication-total:{client-id=producer-1} : 0.000 producer-metrics:failed-reauthentication-rate:{client-id=producer-1} : 0.000 producer-metrics:failed-reauthentication-total:{client-id=producer-1} : 0.000 producer-metrics:incoming-byte-rate:{client-id=producer-1} : 122909.831 producer-metrics:incoming-byte-total:{client-id=producer-1} : 36624593.000 producer-metrics:io-ratio:{client-id=producer-1} : 0.103 producer-metrics:io-time-ns-avg:{client-id=producer-1} : 155782.854 producer-metrics:io-wait-ratio:{client-id=producer-1} : 0.423 producer-metrics:io-wait-time-ns-avg:{client-id=producer-1} : 640556.224 producer-metrics:io-waittime-total:{client-id=producer-1} : 102808922802.000 producer-metrics:iotime-total:{client-id=producer-1} : 33032357708.000 producer-metrics:metadata-age:{client-id=producer-1} : 283.529 producer-metrics:network-io-rate:{client-id=producer-1} : 244.207 producer-metrics:network-io-total:{client-id=producer-1} : 72510.000 producer-metrics:outgoing-byte-rate:{client-id=producer-1} : 106935747.320 producer-metrics:outgoing-byte-total:{client-id=producer-1} : 33992901291.000 producer-metrics:produce-throttle-time-avg:{client-id=producer-1} : 0.000 producer-metrics:produce-throttle-time-max:{client-id=producer-1} : 0.000 producer-metrics:reauthentication-latency-avg:{client-id=producer-1} : NaN producer-metrics:reauthentication-latency-max:{client-id=producer-1} : NaN producer-metrics:record-error-rate:{client-id=producer-1} : 0.000 producer-metrics:record-error-total:{client-id=producer-1} : 0.000 producer-metrics:record-queue-time-avg:{client-id=producer-1} : 100.057 producer-metrics:record-queue-time-max:{client-id=producer-1} : 3440.000 producer-metrics:record-retry-rate:{client-id=producer-1} : 0.000 producer-metrics:record-retry-total:{client-id=producer-1} : 0.000 producer-metrics:record-send-rate:{client-id=producer-1} : 18598.670 producer-metrics:record-send-total:{client-id=producer-1} : 6000000.000 producer-metrics:record-size-avg:{client-id=producer-1} : 12103.000 producer-metrics:record-size-max:{client-id=producer-1} : 12103.000 producer-metrics:records-per-request-avg:{client-id=producer-1} : 152.407 producer-metrics:request-latency-avg:{client-id=producer-1} : 30.006 producer-metrics:request-latency-max:{client-id=producer-1} : 1693.000 producer-metrics:request-rate:{client-id=producer-1} : 122.077 producer-metrics:request-size-avg:{client-id=producer-1} : 875950.384 producer-metrics:request-size-max:{client-id=producer-1} : 1054112.000 producer-metrics:request-total:{client-id=producer-1} : 36255.000 producer-metrics:requests-in-flight:{client-id=producer-1} : 0.000 producer-metrics:response-rate:{client-id=producer-1} : 122.141 producer-metrics:response-total:{client-id=producer-1} : 36255.000 producer-metrics:select-rate:{client-id=producer-1} : 659.815 producer-metrics:select-total:{client-id=producer-1} : 197285.000 producer-metrics:successful-authentication-no-reauth-total:{client-id=producer-1} : 0.000 producer-metrics:successful-authentication-rate:{client-id=producer-1} : 0.000 producer-metrics:successful-authentication-total:{client-id=producer-1} : 0.000 producer-metrics:successful-reauthentication-rate:{client-id=producer-1} : 0.000 producer-metrics:successful-reauthentication-total:{client-id=producer-1} : 0.000 producer-metrics:waiting-threads:{client-id=producer-1} : 0.000 producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node--1} : 0.000 producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node--2} : 0.000 producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-357} : 61091.292 producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-358} : 62046.566 producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node--1} : 8843.000 producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node--2} : 0.000 producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node-357} : 18882528.000 producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node-358} : 17733222.000 producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node--1} : 0.000 producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node--2} : 0.000 producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node-357} : 53121988.622 producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node-358} : 54009460.328 producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node--1} : 99.000 producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node--2} : 0.000 producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node-357} : 17091951276.000 producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node-358} : 16900949916.000 producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node--1} : NaN producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node--2} : NaN producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node-357} : 30.531 producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node-358} : 29.481 producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node--1} : NaN producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node--2} : NaN producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node-357} : 1430.000 producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node-358} : 1693.000 producer-node-metrics:request-rate:{client-id=producer-1, node-id=node--1} : 0.000 producer-node-metrics:request-rate:{client-id=producer-1, node-id=node--2} : 0.000 producer-node-metrics:request-rate:{client-id=producer-1, node-id=node-357} : 61.032 producer-node-metrics:request-rate:{client-id=producer-1, node-id=node-358} : 61.253 producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node--1} : NaN producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node--2} : NaN producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node-357} : 870389.622 producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node-358} : 881740.177 producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node--1} : NaN producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node--2} : NaN producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node-357} : 1053528.000 producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node-358} : 1054112.000 producer-node-metrics:request-total:{client-id=producer-1, node-id=node--1} : 2.000 producer-node-metrics:request-total:{client-id=producer-1, node-id=node--2} : 0.000 producer-node-metrics:request-total:{client-id=producer-1, node-id=node-357} : 18232.000 producer-node-metrics:request-total:{client-id=producer-1, node-id=node-358} : 18021.000 producer-node-metrics:response-rate:{client-id=producer-1, node-id=node--1} : 0.000 producer-node-metrics:response-rate:{client-id=producer-1, node-id=node--2} : 0.000 producer-node-metrics:response-rate:{client-id=producer-1, node-id=node-357} : 61.094 producer-node-metrics:response-rate:{client-id=producer-1, node-id=node-358} : 61.272 producer-node-metrics:response-total:{client-id=producer-1, node-id=node--1} : 2.000 producer-node-metrics:response-total:{client-id=producer-1, node-id=node--2} : 0.000 producer-node-metrics:response-total:{client-id=producer-1, node-id=node-357} : 18232.000 producer-node-metrics:response-total:{client-id=producer-1, node-id=node-358} : 18021.000 producer-topic-metrics:byte-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 106864118.709 producer-topic-metrics:byte-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 33980657526.000 producer-topic-metrics:compression-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.605 producer-topic-metrics:record-error-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.000 producer-topic-metrics:record-error-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.000 producer-topic-metrics:record-retry-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.000 producer-topic-metrics:record-retry-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.000 producer-topic-metrics:record-send-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 18598.670 producer-topic-metrics:record-send-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 6000000.000 19/10/03 14:24:16 INFO producer.KafkaProducer: [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. -----Original Message----- From: Eric Owhadi <eric.owh...@esgyn.com> Sent: Thursday, October 3, 2019 3:45 PM To: users@kafka.apache.org Subject: RE: poor producing performance with very low CPU utilization? External There is a key piece of information that should be critical to guess where the problem is: When I change from ack = all to ack = 1, instead of increasing message/s, it actually devises it by half! As if the problem is about how fast I produce data (given when I use ack 1 I assume I block less time in the synchronous send, and therefore my producing pump increases. I wonder if some sort of contention happen when producer populate the 200 partition queues when the rate of production is high in the user thread? Eric -----Original Message----- From: Eric Owhadi <eric.owh...@esgyn.com> Sent: Thursday, October 3, 2019 1:33 PM To: users@kafka.apache.org Subject: RE: poor producing performance with very low CPU utilization? External Hi Eric, Thanks a lot for your answer. Please find inline responses: >>You've given hardware information about your brokers, but I don't think >>you've provided information about the machine your producer is running on. >>Have you verified that you're not reaching any caps on your producer's >>machine? The producer is on the same machine that the broker. Running very quiet, 3% CPU when I run my test. So no there is no stress on the producing side >>I also think you might be hitting the limit of what a single producer is >>capable of pushing through with your current setup. With record size of ~12k >>and the >>default batch size configuration of 64k, you'll only be able to >>send 5 records per batch. The default number of in flight batches is 5. I have 200 partition on my topic, and the load is well balanced across all partition. So the math you are doing should be X200 right? In addition, I found that batch size had no effect, and the linger.ms was the triggering factor to cause a buffer send. I played with batch size and in flight number of request upward, and that had no effect. >>This means at any given time, you'll only have 25 records in flight per >>connection. I'm assuming your partitions are configured with at least 2 >>replicas. Acks=all >>means your producer is going to wait for the records to >>be fully replicated before considering it complete. >>Doing the math, you have ~200 records per second, but this is split >>between >>2 brokers. This means you're producing 100 records per second per broker. >>Simplifying a bit to 25 records in flight per broker, that's a latency >>of >>~250 ms to move around 300kb. At minimum, this includes the time to, >>[compress the batch], [send the batch over the network to the leader], [write >>the batch >>to the leader's log], [fetch the batch over the network to the >>replica], [write the batch to the replica's log], and all of the assorted >>responses to those calls. given all is local (producer running on same node as broker), and the size of my node (80 vcore), I hope I don t need 250ms to do that... The equivalent workload on hbase2.0 is 10 to 20X faster (and that include same replica config etc). On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote: -----Original Message----- From: Eric Azama <eazama...@gmail.com> Sent: Thursday, October 3, 2019 1:07 PM To: users@kafka.apache.org Subject: Re: poor producing performance with very low CPU utilization? External Hi Eric, You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on. Have you verified that you're not reaching any caps on your producer's machine? I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5. This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all means your producer is going to wait for the records to be fully replicated before considering it complete. Doing the math, you have ~200 records per second, but this is split between 2 brokers. This means you're producing 100 records per second per broker. Simplifying a bit to 25 records in flight per broker, that's a latency of ~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls. On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote: > Hi Jamie, > Thanks for the hint. I played with these parameters, and found only > linger.ms is playing a significant role for my test case. > It is very sensitive and highly non linear. > I get these results: > Linger.ms message per second > 80 100 > 84 205 > 85 215 -> top > 86 213 > 90 205 > 95 195 > 100 187 > 200 100 > > So as you can see, this is very sensitive and one can miss the peek easily. > However, 200 messages per second for 2 powerful nodes and relatively > small message (12016bytes) is still at least 10X bellow what I would have > hoped. > When I see system resources still being barely moving, with cpu at 3%, > I am sure something is not right. > Regards, > Eric > > -----Original Message----- > From: Jamie <jamied...@aol.co.uk.INVALID> > Sent: Wednesday, October 2, 2019 4:27 PM > To: users@kafka.apache.org > Subject: Re: poor producing performance with very low CPU utilization? > > External > > Hi Eric, > I found increasing the linger.ms to between 50-100 ms significantly > increases performance (fewer larger requests instead of many small > ones), I'd also increase the batch size and the buffer.memory. > Thanks, > Jamie > > > -----Original Message----- > From: Eric Owhadi <eric.owh...@esgyn.com> > To: users@kafka.apache.org <users@kafka.apache.org> > Sent: Wed, 2 Oct 2019 16:42 > Subject: poor producing performance with very low CPU utilization? > > Hi Kafka users, > I am new to Kafka and am struggling with getting acceptable producing rate. > I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting > hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as > part of cloudera parcel, with 5GB java heap. > Producer version: Kafka client 2.2.1 > > Wed Oct 2 07:56:59 PDT 2019 > JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64 > Using -XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4 > 1462d01d2_pid6908.hprof > -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as > CSD_JAVA_OPTS Using > /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf > dir Using scripts/control.sh as process script > CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > CMF_CONF_DIR=/etc/cloudera-scm-agent > > Date: Wed Oct 2 07:56:59 PDT 2019 > Host: xxxxx.esgyn.local > Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka > Zookeeper Quorum: > xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181 > Zookeeper Chroot: > PORT: 9092 > JMX_PORT: 9393 > SSL_PORT: 9093 > ENABLE_MONITORING: true > METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter > BROKER_HEAP_SIZE: 5120 > BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC > -Djava.awt.headless=true > BROKER_SSL_ENABLED: false > KERBEROS_AUTH_ENABLED: false > KAFKA_PRINCIPAL: > SECURITY_INTER_BROKER_PROTOCOL: INFERRED > AUTHENTICATE_ZOOKEEPER_CONNECTION: true > SUPER_USERS: kafka > Kafka version found: 2.2.1-kafka4.1.0 > Sentry version found: 1.5.1-cdh5.15.0 > ZK_PRINCIPAL_NAME: zookeeper > Final Zookeeper Quorum is > xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181 > security.inter.broker.protocol inferred as PLAINTEXT > LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092, > > I am producing messages of 12016 bytes uncompressed, then snappy > compressed by kafka. > I am using a topic with 200 partitions, and a custom partitioner that > I verified is doing good job at spreading the load on the 2 brokers. > > My producer config look like: > > kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092"); > kafkaProps.put("key.serializer", > "org.apache.kafka.common.serialization.LongSerializer"); > > kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo > llectorVectorSerializer"); > > kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner"); > kafkaProps.put("compression.type","snappy"); > kafkaProps.put("batch.size","65536"); > kafkaProps.put("acks", "all"); > kafkaProps.put("linger.ms","1"); > > I tried first doing fire and forget send, thinking I would get best > performance. > Then I tried synchronous send, and amazingly found that I would get > better performance with sync send. > > However, after 1 or 2 minute of load test, I start getting error on > the synchronous send like this: > ava.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) > for > DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed > since batch creation > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.eclipse.jetty.server.Server.handle(Server.java:505) > at > org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267) > at org.eclipse.jetty.io > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) > at org.eclipse.jetty.io > .FillInterest.fillable(FillInterest.java:103) > at org.eclipse.jetty.io > .ChannelEndPoint$2.run(ChannelEndPoint.java:117) > at > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) > at > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) > at > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) > at > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) > at > org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 > ms has passed since batch creation > > So I am suspecting that I am producing too fast, and brokers cannot > catch up. > I tried bumping up the io thread from default 8 to 40, that did not help. > > I am getting a producing rate of only about 100 message per seconds, > and about 1 Megabyte per seconds according to kafka metrics. > The CPU utilization is barely noticeable (3%), network is ridiculously > unaffected, and having googled around, this is not the kind of perf I > should expect out of my config. I was hoping for at least 10X more if > not 100X better. Was my expectations too high, or am I missing > something in config that is causing this performance numbers? > > Some details: I produce using a jetty custom handler that I verified > to be super-fast when I am not producing (commenting out the send()), > and I am using a single (I also tried with 2) producer reused on all jetty > threads. > > Any help/clue would be much appreciated, Thanks in advance, Eric > Owhadi Esgyn Corporation. > > >