so I did upgrade to 2.0.0 and still seeing the same result. below is the program I am using. I am running everything on a single server. (centos 7, 24 core, 32G ram , 1 broker, 1 zookeeper, single harddrive), I understand the single hard drive is less ideal. but still don't expect it can over 3 seconds.
case 1. I create 1 parittions for input and 1 partition for output. message size 10K producer give parameter (3600, 1000, 2) // 2 message per 1000 micro second for 3600 seconds, that translate to 2,000 message/s, I still see latency, sometime can reach to 3 seconds. case 2 50 partitions for input, and 50 partitions for output. message size 10K producer give parameter (3600, 1000, 20) // 20 message per 1000 micro second for 3600 seconds, that translate to 20,000 message/s,latency not only high, and happen more often. Any suggestion is appreciated. target is per partition handle 1,000 -- 2,000 message/s and all latency lower than 100ms. ====build.gradle====== plugins { id 'application' id 'java' } group 'com.bofa' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 mainClassName="main.StreamApp" repositories { mavenCentral() } dependencies { compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0' compile group: "org.apache.kafka", name: "kafka-streams", version: "2.0.0" compile group: 'io.dropwizard.metrics', name: 'metrics-core', version:'3.2.6' testCompile group: 'junit', name: 'junit', version: '4.12' } ========producer======== package main; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import Util.BusyTimer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; public class SimpleProducer { public static void main(String[] args) { final int time =Integer.valueOf(args[0]); final long interval = Integer.valueOf(args[1]); final int batch =Integer.valueOf(args[2]); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-perf-test-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String,String> kafkaProducer = new KafkaProducer(props); StringBuffer buffer = new StringBuffer(); for(int i=0; i<10240; i++) { buffer.append('a'); } String value = buffer.toString(); final long speed = 1000000/interval; Runnable task = new Runnable() { int sendNum=0; @Override public void run() { for(int i=0; i<batch; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("input", System.nanoTime() + "-" + value); kafkaProducer.send(record); sendNum++; } if(sendNum % (speed * batch) == 0){ System.out.println(System.currentTimeMillis() + " : " + sendNum); } } }; BusyTimer timer = new BusyTimer(interval,time, task); timer.spaceMessageWithInterval(); } } ============kafka stream============= package main; import java.util.Properties; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; public class StreamApp { public static void main(String[] args) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-stream"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "simple_stream_1"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String() .getClass().getName()); streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30); streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.LINGER_MS_CONFIG,"5"); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,2); StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> inputStream = builder.stream( "input", Consumed.with( new Serdes.StringSerde(), new Serdes.StringSerde() ) ); inputStream.to( "output", Produced.with(new Serdes.StringSerde(), new Serdes.StringSerde()) ); KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); streams.start(); } } =============consumer============================ package main; import java.util.Collections; import java.util.Properties; import com.codahale.metrics.Reservoir; import com.codahale.metrics.UniformReservoir; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class SimpleConsumer { public static void main(String[] args) { int expectedSpeed = args[0]; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-perf-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList("output")); consumer.poll(0); int recNum=0; Reservoir totalRes = new UniformReservoir(); while (true) { ConsumerRecords<String, String> records = consumer.poll(10); for(ConsumerRecord<String,String> record : records){ long sendTime = Long.valueOf(record.value().split("-")[0]); long takeTime = System.nanoTime() - sendTime; if(recNum> 20000) { totalRes.update(takeTime); } recNum++; if(recNum % expectedSpeed == 0){ System.out.println("==============="+ recNum + "============"); System.out.println(" mean: " + totalRes.getSnapshot().getMean()/1000000); System.out.println(" 75%: " + totalRes.getSnapshot().get75thPercentile()/1000000); System.out.println(" 99%: " + totalRes.getSnapshot().get99thPercentile()/1000000); System.out.println(" 99.9%: " + totalRes.getSnapshot().get999thPercentile()/1000000); System.out.println(" Max: " + totalRes.getSnapshot().getMax()/1000000); System.out.println("========================================"); totalRes = new UniformReservoir(); } }; } } } ==========busy timer===================== //idea is space the message at a fixed time.(as thread.sleep, but sleep is less accurate) package Util; import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class BusyTimer { long interval; long duration; ArrayList<Long> pubTime; ExecutorService ex = Executors.newSingleThreadExecutor(); Runnable task; public BusyTimer(long microInterval, long exDurationInSeconds, Runnable task){ pubTime = new ArrayList<Long>((int)(exDurationInSeconds * 1000 * 1000 / microInterval+1)); this.interval = microInterval * 1000; this.duration = exDurationInSeconds * 1000000000; this.task = task; } private void busywaitUntil(long nano){ while(System.nanoTime() < nano){ } } public void spaceMessageWithInterval(){ int i =0 ; long baseTime = System.nanoTime(); long doneTime = baseTime + duration; while(true) { task.run(); pubTime.add(System.nanoTime()); long targetTime = System.nanoTime() + interval; if(System.nanoTime() > doneTime ){ break; } busywaitUntil(targetTime); } } } On Fri, Aug 24, 2018 at 3:37 PM Nan Xu <nanxu1...@gmail.com> wrote: > Looks really promising but after upgrade, still show the same result. I > will post the program soon. Maybe you can see where the problem could be. > > Nan > > On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Nan, >> >> Kafka does not tie up the processing thread to do disk flushing. However, >> since you are on an older version of Kafka I suspect you're bumping into >> some old issues that have been resolved in later versions. e.g. >> >> https://issues.apache.org/jira/browse/KAFKA-4614 >> >> I'd suggest you upgrading to latest version (2.0.0) and try again to see >> if >> you observe the same pattern. >> >> Guozhang >> >> On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni < >> sbpothin...@gmail.com> wrote: >> >> > I will wait for the expert’s opinion: >> > >> > Did the Transparent Huge Pages(THP) disabled on the broker machine? >> it’s a >> > Linux kernel parameter. >> > >> > -Sudhir >> > >> > > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote: >> > > >> > > I think I found where the problem is, how to solve and why, still not >> > sure. >> > > >> > > it related to disk (maybe flushing?). I did a single machine, single >> > node, >> > > single topic and single partition setup. producer pub as 2000 >> message/s, >> > > 10K size message size. and single key. >> > > >> > > when I save kafka log to the memory based partition, I don't see a >> > latency >> > > over 100ms. top around 70ms. >> > > when I save to a ssd hard drive. I do see latency spike, sometime over >> > 1s. >> > > >> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has >> impact, >> > > but only to make thing worse... need suggestion. >> > > >> > > I think log flushing is totally async and done by OS in the default >> > > setting. does kafka has to wait when flushing data to disk? >> > > >> > > Thanks, >> > > Nan >> > > >> > > >> > > >> > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > >> >> > >> Given your application code: >> > >> >> > >> ---------------------------- >> > >> >> > >> final KStream<String, NodeMutation> localDeltaStream = >> builder.stream( >> > >> >> > >> localDeltaTopic, >> > >> >> > >> Consumed.with( >> > >> >> > >> new Serdes.StringSerde(), >> > >> >> > >> new NodeMutationSerde<>() >> > >> >> > >> ) >> > >> >> > >> ); >> > >> >> > >> KStream<String, NodeState> localHistStream = >> > localDeltaStream.mapValues( >> > >> >> > >> (mutation) -> NodeState >> > >> >> > >> .newBuilder() >> > >> >> > >> .setMeta( >> > >> >> > >> mutation.getMetaMutation().getMeta() >> > >> >> > >> ) >> > >> >> > >> .setValue( >> > >> >> > >> mutation.getValueMutation().getValue() >> > >> >> > >> ) >> > >> >> > >> .build() >> > >> >> > >> ); >> > >> >> > >> localHistStream.to( >> > >> >> > >> localHistTopic, >> > >> >> > >> Produced.with(new Serdes.StringSerde(), new >> > NodeStateSerde<>()) >> > >> >> > >> ); >> > >> >> > >> ---------------------------- >> > >> >> > >> which is pure stateless, committing will not touch on an state >> > directory at >> > >> all. Hence committing only involves committing offsets to Kafka. >> > >> >> > >> >> > >> Guozhang >> > >> >> > >> >> > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> >> wrote: >> > >>> >> > >>> I was suspecting that too, but I also noticed the spike is not >> spaced >> > >>> around 10s. to further prove it. I put kafka data directory in a >> memory >> > >>> based directory. it still has such latency spikes. I am going to >> test >> > >> it >> > >>> on a single broker, single partition env. will report back soon. >> > >>> >> > >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com> >> > >> wrote: >> > >>> >> > >>>> Hello Nan, >> > >>>> >> > >>>> Thanks for the detailed information you shared. When Kafka Streams >> is >> > >>>> normally running, no rebalances should be triggered unless some of >> the >> > >>>> instances (in your case, docker containers) have soft failures. >> > >>>> >> > >>>> I suspect the latency spike is due to the commit intervals: streams >> > >> will >> > >>>> try to commit its offset at a regular paces, which may increase >> > >> latency. >> > >>> It >> > >>>> is controlled by the "commit.interval.ms" config value. I saw >> that in >> > >>> your >> > >>>> original email you've set it to 10 * 1000 (10 seconds). Is that >> > aligned >> > >>>> with the frequency you observe latency spikes? >> > >>>> >> > >>>> >> > >>>> Guozhang >> > >>>> >> > >>>> >> > >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> >> > wrote: >> > >>>>> >> > >>>>> did more test and and make the test case simple. >> > >>>>> all the setup now is a single physical machine. running 3 docker >> > >>>> instance. >> > >>>>> a1, a2, a3 >> > >>>>> >> > >>>>> kafka + zookeeper running on all of those docker containers. >> > >>>>> producer running on a1, send a single key, update speed 2000 >> > >>> message/s, >> > >>>>> each message is 10K size. >> > >>>>> 3 consumer(different group) are running. one on each docker. >> > >>>>> all topics are pre-created. >> > >>>>> in startup, I do see some latency greater than 100ms, which is >> fine. >> > >>> and >> > >>>>> then everything is good. latency is low and consumer don't see >> > >> anything >> > >>>>> over 100ms for a while. >> > >>>>> then I see a few messages have latency over 100ms. then back to >> > >> normal, >> > >>>>> then happen again..... do seems like gc problem. but I check the >> gc >> > >>>> log. I >> > >>>>> don't think it can cause over 100ms. (both are G1 collector) >> > >>>>> >> > >>>>> after the stream stable running( exclude the startup), the first >> > >>> message >> > >>>>> over 100ms take 179ms and the gc ( it has a 30ms pause, but >> should >> > >> not >> > >>>>> cause a 179ms end to end). >> > >>>>> >> > >>>>> FROM APP >> > >>>>> >> > >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure) >> > >>>>> 3184739K->84018K(5947904K), 0.0093730 secs] >> > >>>>> >> > >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure) >> > >>>>> 3184690K->84280K(6053888K), 0.0087473 secs] >> > >>>>> >> > >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure) >> > >>>>> 3301176K->84342K(6061056K), 0.0127339 secs] >> > >>>>> >> > >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure) >> > >>>>> 3301238K->84624K(6143488K), 0.0140844 secs] >> > >>>>> >> > >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure) >> > >>>>> 3386000K->89949K(6144000K), 0.0108118 secs] >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> kafka a1 >> > >>>>> >> > >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation >> > >> Pause) >> > >>>>> (young), 0.0214200 secs] >> > >>>>> >> > >>>>> [Parallel Time: 17.2 ms, GC Workers: 8] >> > >>>>> >> > >>>>> [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max: >> > >>>>> 7982673.8, Diff: 16.3] >> > >>>>> >> > >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff: >> > >> 1.5, >> > >>>>> Sum: 1.5] >> > >>>>> >> > >>>>> [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, >> Sum: >> > >>> 8.4] >> > >>>>> >> > >>>>> [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13, >> > >> Sum: >> > >>>> 37] >> > >>>>> >> > >>>>> [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum: >> > >> 7.1] >> > >>>>> >> > >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: >> > >>> 0.0, >> > >>>>> Sum: 0.0] >> > >>>>> >> > >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5, >> > >> Sum: >> > >>>>> 36.5] >> > >>>>> >> > >>>>> [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9, >> > >> Sum: >> > >>>> 2.9] >> > >>>>> >> > >>>>> [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: >> 24, >> > >>>> Sum: >> > >>>>> 83] >> > >>>>> >> > >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: >> 0.0, >> > >>>> Sum: >> > >>>>> 0.1] >> > >>>>> >> > >>>>> [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff: >> > >> 16.2, >> > >>>>> Sum: 56.5] >> > >>>>> >> > >>>>> [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max: >> > >>>> 7982674.5, >> > >>>>> Diff: 0.6] >> > >>>>> >> > >>>>> [Code Root Fixup: 0.0 ms] >> > >>>>> >> > >>>>> [Code Root Purge: 0.0 ms] >> > >>>>> >> > >>>>> [Clear CT: 1.0 ms] >> > >>>>> >> > >>>>> [Other: 3.2 ms] >> > >>>>> >> > >>>>> [Choose CSet: 0.0 ms] >> > >>>>> >> > >>>>> [Ref Proc: 1.9 ms] >> > >>>>> >> > >>>>> [Ref Enq: 0.0 ms] >> > >>>>> >> > >>>>> [Redirty Cards: 0.8 ms] >> > >>>>> >> > >>>>> [Humongous Register: 0.1 ms] >> > >>>>> >> > >>>>> [Humongous Reclaim: 0.0 ms] >> > >>>>> >> > >>>>> [Free CSet: 0.2 ms] >> > >>>>> >> > >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K >> Heap: >> > >>>>> 265.5M(1024.0M)->217.9M(1024.0M)] >> > >>>>> >> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] >> > >>>>> >> > >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation >> > >> Pause) >> > >>>>> (young), 0.0310004 secs] >> > >>>>> >> > >>>>> [Parallel Time: 24.4 ms, GC Workers: 8] >> > >>>>> >> > >>>>> [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max: >> > >>>>> 7984444.7, Diff: 18.6] >> > >>>>> >> > >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff: >> > >> 1.9, >> > >>>>> Sum: 2.0] >> > >>>>> >> > >>>>> [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8, >> > >> Sum: >> > >>>>> 32.9] >> > >>>>> >> > >>>>> [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25, >> > >> Sum: >> > >>>> 43] >> > >>>>> >> > >>>>> [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, >> Sum: >> > >>>> 25.5] >> > >>>>> >> > >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: >> > >>> 0.0, >> > >>>>> Sum: 0.0] >> > >>>>> >> > >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9, >> > >> Sum: >> > >>>>> 32.7] >> > >>>>> >> > >>>>> [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6, >> > >> Sum: >> > >>>> 6.8] >> > >>>>> >> > >>>>> [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: >> 10, >> > >>> Sum: >> > >>>>> 43] >> > >>>>> >> > >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: >> 0.0, >> > >>>> Sum: >> > >>>>> 0.1] >> > >>>>> >> > >>>>> [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff: >> > >>> 19.1, >> > >>>>> Sum: 100.1] >> > >>>>> >> > >>>>> [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max: >> > >>>> 7984449.9, >> > >>>>> Diff: 0.8] >> > >>>>> >> > >>>>> [Code Root Fixup: 0.0 ms] >> > >>>>> >> > >>>>> [Code Root Purge: 0.0 ms] >> > >>>>> >> > >>>>> [Clear CT: 1.1 ms] >> > >>>>> >> > >>>>> [Other: 5.5 ms] >> > >>>>> >> > >>>>> [Choose CSet: 0.0 ms] >> > >>>>> >> > >>>>> [Ref Proc: 2.2 ms] >> > >>>>> >> > >>>>> [Ref Enq: 0.0 ms] >> > >>>>> >> > >>>>> [Redirty Cards: 2.8 ms] >> > >>>>> >> > >>>>> [Humongous Register: 0.1 ms] >> > >>>>> >> > >>>>> [Humongous Reclaim: 0.0 ms] >> > >>>>> >> > >>>>> [Free CSet: 0.1 ms] >> > >>>>> >> > >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K >> Heap: >> > >>>>> 265.9M(1024.0M)->218.4M(1024.0M)] >> > >>>>> >> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] >> > >>>>> >> > >>>>> >> > >>>>> so when kafka stream running, is there any trying to rebalance? >> > >> either >> > >>>>> broker rebalance or client rebalance? >> > >>>>> any kind of test to see what cause the trouble? >> > >>>>> >> > >>>>> Thanks, >> > >>>>> Nan >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang < >> wangg...@gmail.com> >> > >>>> wrote: >> > >>>>> >> > >>>>>> Okay, so you're measuring end-to-end time from producer -> broker >> > >> -> >> > >>>>>> streams' consumer client, there are multiple phases that can >> > >>> contribute >> > >>>>> to >> > >>>>>> the 100ms latency, and I cannot tell if stream's consumer phase >> is >> > >>> the >> > >>>>>> major contributor. For example, if the topic was not created >> > >> before, >> > >>>> then >> > >>>>>> when the broker first received a produce request it may need to >> > >>> create >> > >>>>> the >> > >>>>>> topic, which involves multiple steps including writes to ZK which >> > >>> could >> > >>>>>> take time. >> > >>>>>> >> > >>>>>> There are some confusions from your description: you mentioned >> > >> "Kafka >> > >>>>>> cluster is already up and running", but I think you are referring >> > >> to >> > >>>>> "Kafka >> > >>>>>> Streams application instances are already up and running", right? >> > >>> Since >> > >>>>>> only the latter has rebalance process, while the Kafak brokers do >> > >> not >> > >>>>>> really have "rebalances" except balancing load by migrating >> > >>> partitions. >> > >>>>>> >> > >>>>>> Guozhang >> > >>>>>> >> > >>>>>> >> > >>>>>> >> > >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com> >> > >> wrote: >> > >>>>>> >> > >>>>>>> right, so my kafka cluster is already up and running for a >> while, >> > >>>> and I >> > >>>>>> can >> > >>>>>>> see from the log all broker instance already change from >> > >> rebalance >> > >>> to >> > >>>>>>> running. >> > >>>>>>> >> > >>>>>>> I did a another test. >> > >>>>>>> from producer, right before the message get send to the broker, >> I >> > >>>> put a >> > >>>>>>> timestamp in the message. and from the consumer side which is >> > >> after >> > >>>>>> stream >> > >>>>>>> processing, I compare this timestamp with current time. I can >> see >> > >>>> some >> > >>>>>>> message processing time is above 100ms on some real powerful >> > >>>> hardware. >> > >>>>>> and >> > >>>>>>> from my application gc, all the gc time is below 1ms, kafka gc >> > >> only >> > >>>>>> happen >> > >>>>>>> once and below 1ms too. >> > >>>>>>> >> > >>>>>>> very puzzled. is there any communication to zookeeper, if not >> get >> > >>>>>> response, >> > >>>>>>> will cause the broker to pause? I don't think that's the case >> but >> > >>> at >> > >>>>> this >> > >>>>>>> time don't know what else can be suspected. >> > >>>>>>> >> > >>>>>>> Nan >> > >>>>>>> >> > >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang < >> > >> wangg...@gmail.com> >> > >>>>>> wrote: >> > >>>>>>> >> > >>>>>>>> Hello Nan, >> > >>>>>>>> >> > >>>>>>>> Note that Streams may need some time to rebalance and assign >> > >>> tasks >> > >>>>> even >> > >>>>>>> if >> > >>>>>>>> you only starts with one instance. >> > >>>>>>>> >> > >>>>>>>> I'd suggest you register your state listener in Kafka Streams >> > >> via >> > >>>>>>>> KafkaStreams#setStateListener, and your customized >> > >> StateListener >> > >>>>> should >> > >>>>>>>> record when the state transits from REBALANCING to RUNNING >> > >> since >> > >>>> only >> > >>>>>>> after >> > >>>>>>>> that the streams client will start to process the first record. >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> Guozhang >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com> >> > >>>> wrote: >> > >>>>>>>> >> > >>>>>>>>> thanks, which JMX properties indicate "processing latency >> > >>>>> spikes" / >> > >>>>>>>>> "throughput" >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax < >> > >>>>>> matth...@confluent.io >> > >>>>>>>> >> > >>>>>>>>> wrote: >> > >>>>>>>>> >> > >>>>>>>>>> I cannot spot any obvious reasons. >> > >>>>>>>>>> >> > >>>>>>>>>> As you consume from the result topic for verification, we >> > >>>> should >> > >>>>>>> verify >> > >>>>>>>>>> that the latency spikes original on write and not on read: >> > >>> you >> > >>>>>> might >> > >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if >> > >>>>>>> processing >> > >>>>>>>>>> latency spikes or throughput drops. >> > >>>>>>>>>> >> > >>>>>>>>>> Also watch for GC pauses in the JVM. >> > >>>>>>>>>> >> > >>>>>>>>>> Hope this helps. >> > >>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>>> -Matthias >> > >>>>>>>>>> >> > >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote: >> > >>>>>>>>>>> btw, I am using version 0.10.2.0 >> > >>>>>>>>>>> >> > >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu < >> > >>> nanxu1...@gmail.com> >> > >>>>>>> wrote: >> > >>>>>>>>>>> >> > >>>>>>>>>>>> I am working on a kafka stream app, and see huge latency >> > >>>>>> variance, >> > >>>>>>>>>>>> wondering what can cause this? >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> the processing is very simple and don't have state, >> > >>>> linger.ms >> > >>>>>>>> already >> > >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and >> > >>>>> published >> > >>>>>>> as >> > >>>>>>>>> 2000 >> > >>>>>>>>>>>> messages/s, network is 10G. using a regular consumer >> > >>> watch >> > >>>>> the >> > >>>>>>>>>>>> localHistTopic topic and just every 2000 message print >> > >>> out >> > >>>> a >> > >>>>>>>> counter, >> > >>>>>>>>>> it >> > >>>>>>>>>>>> usually every second I get a count 2000 as the publish >> > >>>> speed, >> > >>>>>> but >> > >>>>>>>>>> sometime >> > >>>>>>>>>>>> I see it stall for 3 or more seconds and then print out >> > >> a >> > >>>> few >> > >>>>>>> count. >> > >>>>>>>>>> like >> > >>>>>>>>>>>> cpu is paused during that time or message being >> > >>> cache/batch >> > >>>>> then >> > >>>>>>>>>> processed. >> > >>>>>>>>>>>> any suggestion? >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> final Properties streamsConfiguration = new >> > >>> Properties(); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >> > >>>>>>>>>>>> applicationId); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_ >> > >>>>> CONFIG, >> > >>>>>>>>>> clientId); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_ >> > >>>>>>>>> SERVERS_CONFIG, >> > >>>>>>>>>>>> bootstrapServers); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_ >> > >>>>>>>>> SERDE_CLASS_CONFIG, >> > >>>>>>>>>>>> Serdes.String() >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> .getClass().getName()); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ >> > >>>>> MS_CONFIG, >> > >>>>>>>>>>>> 10 * 1000); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> // >> > >>>>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>> >> > >>>>>> >> > >>>> >> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ >> > BUFFERING_CONFIG, >> > >>>>>>>>> 0); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.PRODUCER_PREFIX >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.PRODUCER_PREFIX >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, >> > >>>>> 335544320); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.PRODUCER_PREFIX >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ >> > >>>>>>> REQUESTS_PER_CONNECTION, >> > >>>>>>>>> 30); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.PRODUCER_PREFIX >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.consumerPrefix( >> > >>>>>>>>>>>> ConsumerConfig.MAX_PARTITION_ >> > >>>>> FETCH_BYTES_CONFIG),20 >> > >>>>>> * >> > >>>>>>>>> 1024 * >> > >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> // >> > >>>>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>> >> > >>>>>> >> > >>>> >> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ >> > BUFFERING_CONFIG, >> > >>>>>>>>> 0); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.PRODUCER_PREFIX >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.PRODUCER_PREFIX >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, >> > >>>>> 335544320); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.PRODUCER_PREFIX >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ >> > >>>>>>> REQUESTS_PER_CONNECTION, >> > >>>>>>>>> 30); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.PRODUCER_PREFIX >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streamsConfiguration.put( >> > >>>>> StreamsConfig.consumerPrefix( >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_ >> > >>> CONFIG >> > >>>>> , >> > >>>>>>> 20 * >> > >>>>>>>>>> 1024 * >> > >>>>>>>>>>>> 1024); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder(); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream = >> > >>>>>>>>> builder.stream( >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> localDeltaTopic, >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> Consumed.with( >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> new Serdes.StringSerde(), >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> new NodeMutationSerde<>() >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> ) >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> ); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> KStream<String, NodeState> localHistStream = >> > >>>>>>>>>> localDeltaStream.mapValues( >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> (mutation) -> NodeState >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> .newBuilder() >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> .setMeta( >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> mutation.getMetaMutation().getMeta() >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> ) >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> .setValue( >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> mutation.getValueMutation(). >> > >>> getValue() >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> ) >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> .build() >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> ); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> localHistStream.to( >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> localHistTopic, >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> Produced.with(new Serdes.StringSerde(), new >> > >>>>>>>>>> NodeStateSerde<>()) >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> ); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streams = new KafkaStreams(builder.build(), >> > >>>>>>> streamsConfiguration); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streams.cleanUp(); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> streams.start(); >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> -- >> > >>>>>>>> -- Guozhang >> > >>>>>>>> >> > >>>>>>> >> > >>>>>> >> > >>>>>> >> > >>>>>> >> > >>>>>> -- >> > >>>>>> -- Guozhang >> > >>>>>> >> > >>>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> -- >> > >>>> -- Guozhang >> > >>>> >> > >>> >> > >> >> > >> >> > >> >> > >> -- >> > >> -- Guozhang >> > >> >> > >> >> >> >> -- >> -- Guozhang >> >