maybe easier to use github.

 https://github.com/angelfox123/kperf


On Sat, Aug 25, 2018 at 8:43 PM Nan Xu <nanxu1...@gmail.com> wrote:

> so I did upgrade to 2.0.0 and still seeing the same result. below is the
> program I am using.  I am running everything on a single server. (centos 7,
> 24 core, 32G ram , 1 broker, 1 zookeeper, single harddrive), I understand
> the single hard drive is less ideal. but still don't expect it can over 3
> seconds.
>
>
> case 1.
> I create 1 parittions for input and  1 partition for output. message size
> 10K
> producer give parameter  (3600, 1000, 2)   // 2 message per 1000 micro
> second for 3600 seconds, that translate to 2,000 message/s, I still see
> latency, sometime can reach to 3 seconds.
>
> case 2
> 50 partitions for input, and 50 partitions for output. message size 10K
> producer give parameter  (3600, 1000, 20)   // 20 message per 1000 micro
> second for 3600 seconds, that translate to 20,000 message/s,latency not
> only high, and happen more often.
>
>
> Any suggestion is appreciated. target is per partition handle 1,000 --
> 2,000 message/s and all latency lower than 100ms.
>
> ====build.gradle======
> plugins {
>     id 'application'
>     id 'java'
> }
> group 'com.bofa'
> version '1.0-SNAPSHOT'
> sourceCompatibility = 1.8
> mainClassName="main.StreamApp"
>
> repositories {
>     mavenCentral()
> }
>
> dependencies {
>     compile group: 'org.apache.kafka', name: 'kafka-clients', version:
> '2.0.0'
>     compile group: "org.apache.kafka", name: "kafka-streams", version:
> "2.0.0"
>     compile group: 'io.dropwizard.metrics', name: 'metrics-core',
> version:'3.2.6'
>     testCompile group: 'junit', name: 'junit', version: '4.12'
> }
>
> ========producer========
> package main;
>
> import java.util.Properties;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import Util.BusyTimer;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.StringSerializer;
>
> public class SimpleProducer {
>     public static void main(String[] args) {
> final int time =Integer.valueOf(args[0]);
> final long interval = Integer.valueOf(args[1]);
> final int batch =Integer.valueOf(args[2]);
>         Properties props = new Properties();
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         props.put(ProducerConfig.CLIENT_ID_CONFIG,
> "kafka-perf-test-producer");
>         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
>         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
>
>         KafkaProducer<String,String> kafkaProducer = new
> KafkaProducer(props);
>
>         StringBuffer buffer = new StringBuffer();
>         for(int i=0; i<10240; i++) {
>             buffer.append('a');
>         }
>         String value = buffer.toString();
>
>         final long speed = 1000000/interval;
>         Runnable task = new Runnable() {
>             int sendNum=0;
>             @Override
>             public void run() {
>
>                 for(int i=0; i<batch; i++) {
>                     ProducerRecord<String, String> record = new
> ProducerRecord<>("input",  System.nanoTime() + "-" + value);
>                     kafkaProducer.send(record);
>                     sendNum++;
>                 }
>
>                 if(sendNum % (speed * batch) == 0){
>                     System.out.println(System.currentTimeMillis() + " : "
> + sendNum);
>                 }
>             }
>         };
>
>         BusyTimer timer = new BusyTimer(interval,time, task);
>         timer.spaceMessageWithInterval();
>     }
> }
>
>
> ============kafka stream=============
> package main;
>
> import java.util.Properties;
>
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.Consumed;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.Produced;
>
> public class StreamApp {
>     public static void main(String[] args) {
>         final Properties streamsConfiguration = new Properties();
>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "simple-stream");
>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
> "simple_stream_1");
>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String()
>             .getClass().getName());
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
>             + ProducerConfig.LINGER_MS_CONFIG,"5");
>
> streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,2);
>
>
>         StreamsBuilder builder = new StreamsBuilder();
>         final KStream<String, String> inputStream = builder.stream(
>             "input",
>             Consumed.with(
>                 new Serdes.StringSerde(),
>                 new Serdes.StringSerde()
>             )
>         );
>
>         inputStream.to(
>             "output",
>             Produced.with(new Serdes.StringSerde(), new
> Serdes.StringSerde())
>         );
>
>         KafkaStreams streams = new KafkaStreams(builder.build(),
> streamsConfiguration);
>         streams.start();
>     }
> }
>
> =============consumer============================
> package main;
>
> import java.util.Collections;
> import java.util.Properties;
>
> import com.codahale.metrics.Reservoir;
> import com.codahale.metrics.UniformReservoir;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.apache.kafka.clients.consumer.ConsumerRecords;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.StringDeserializer;
>
> public class SimpleConsumer {
>     public static void main(String[] args) {
> int expectedSpeed = args[0];
>         Properties props = new Properties();
>         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         props.put(ConsumerConfig.GROUP_ID_CONFIG,
> "kafka-perf-consumer-group");
>         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
>         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
>         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
>
>         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
>
>         KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
>         consumer.subscribe(Collections.singletonList("output"));
>
>         consumer.poll(0);
>         int recNum=0;
>
>         Reservoir totalRes = new UniformReservoir();
>
>         while (true) {
>             ConsumerRecords<String, String> records = consumer.poll(10);
>             for(ConsumerRecord<String,String> record : records){
>                 long sendTime = Long.valueOf(record.value().split("-")[0]);
>                 long takeTime = System.nanoTime() - sendTime;
>                 if(recNum> 20000) {
>                     totalRes.update(takeTime);
>                 }
>                 recNum++;
>
>                 if(recNum % expectedSpeed == 0){
>                     System.out.println("==============="+ recNum +
> "============");
>                     System.out.println("  mean: " +
> totalRes.getSnapshot().getMean()/1000000);
>                     System.out.println("  75%: " +
> totalRes.getSnapshot().get75thPercentile()/1000000);
>                     System.out.println("  99%: " +
> totalRes.getSnapshot().get99thPercentile()/1000000);
>                     System.out.println("  99.9%: " +
> totalRes.getSnapshot().get999thPercentile()/1000000);
>                     System.out.println("  Max: " +
> totalRes.getSnapshot().getMax()/1000000);
>
> System.out.println("========================================");
>                     totalRes = new UniformReservoir();
>                 }
>             };
>         }
>     }
> }
>
> ==========busy timer=====================
> //idea is space the message at a fixed time.(as thread.sleep, but sleep is
> less accurate)
> package Util;
>
> import java.util.ArrayList;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.atomic.AtomicInteger;
> import java.util.concurrent.atomic.AtomicLong;
>
> public class BusyTimer {
>     long interval;
>     long duration;
>     ArrayList<Long> pubTime;
>     ExecutorService ex = Executors.newSingleThreadExecutor();
>     Runnable task;
>
>
>     public BusyTimer(long microInterval, long exDurationInSeconds,
> Runnable task){
>         pubTime = new ArrayList<Long>((int)(exDurationInSeconds * 1000 *
> 1000 / microInterval+1));
>
>         this.interval = microInterval * 1000;
>         this.duration = exDurationInSeconds * 1000000000;
>         this.task = task;
>
>     }
>
>     private void busywaitUntil(long nano){
>         while(System.nanoTime() < nano){
>
>         }
>     }
>
>     public void spaceMessageWithInterval(){
>         int i =0 ;
>         long baseTime = System.nanoTime();
>         long doneTime = baseTime + duration;
>         while(true) {
>             task.run();
>             pubTime.add(System.nanoTime());
>             long targetTime = System.nanoTime() + interval;
>             if(System.nanoTime() > doneTime ){
>                 break;
>             }
>             busywaitUntil(targetTime);
>         }
>     }
> }
>
>
>
> On Fri, Aug 24, 2018 at 3:37 PM Nan Xu <nanxu1...@gmail.com> wrote:
>
>> Looks really promising but after upgrade, still show the same result. I
>> will post the program soon. Maybe you can see where the problem could be.
>>
>> Nan
>>
>> On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> Hello Nan,
>>>
>>> Kafka does not tie up the processing thread to do disk flushing. However,
>>> since you are on an older version of Kafka I suspect you're bumping into
>>> some old issues that have been resolved in later versions. e.g.
>>>
>>> https://issues.apache.org/jira/browse/KAFKA-4614
>>>
>>> I'd suggest you upgrading to latest version (2.0.0) and try again to see
>>> if
>>> you observe the same pattern.
>>>
>>> Guozhang
>>>
>>> On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni <
>>> sbpothin...@gmail.com> wrote:
>>>
>>> > I will wait for the expert’s opinion:
>>> >
>>> > Did the Transparent Huge Pages(THP) disabled on the broker machine?
>>> it’s a
>>> > Linux kernel parameter.
>>> >
>>> > -Sudhir
>>> >
>>> > > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote:
>>> > >
>>> > > I think I found where the problem is, how to solve and why, still not
>>> > sure.
>>> > >
>>> > > it related to disk (maybe flushing?). I did a single machine, single
>>> > node,
>>> > > single topic and single partition setup.  producer pub as 2000
>>> message/s,
>>> > > 10K size message size. and single key.
>>> > >
>>> > > when I save kafka log to the  memory based partition, I don't see a
>>> > latency
>>> > > over 100ms. top around 70ms.
>>> > > when I save to a ssd hard drive. I do see latency spike, sometime
>>> over
>>> > 1s.
>>> > >
>>> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has
>>> impact,
>>> > > but only to make thing worse... need suggestion.
>>> > >
>>> > > I think log flushing is totally async and done by OS in the default
>>> > > setting. does kafka has to wait when flushing data to disk?
>>> > >
>>> > > Thanks,
>>> > > Nan
>>> > >
>>> > >
>>> > >
>>> > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com>
>>> > wrote:
>>> > >>
>>> > >> Given your application code:
>>> > >>
>>> > >> ----------------------------
>>> > >>
>>> > >> final KStream<String, NodeMutation> localDeltaStream =
>>> builder.stream(
>>> > >>
>>> > >>            localDeltaTopic,
>>> > >>
>>> > >>            Consumed.with(
>>> > >>
>>> > >>                new Serdes.StringSerde(),
>>> > >>
>>> > >>                new NodeMutationSerde<>()
>>> > >>
>>> > >>            )
>>> > >>
>>> > >>        );
>>> > >>
>>> > >>  KStream<String, NodeState> localHistStream =
>>> > localDeltaStream.mapValues(
>>> > >>
>>> > >>            (mutation) -> NodeState
>>> > >>
>>> > >>                .newBuilder()
>>> > >>
>>> > >>                .setMeta(
>>> > >>
>>> > >>                    mutation.getMetaMutation().getMeta()
>>> > >>
>>> > >>                )
>>> > >>
>>> > >>                .setValue(
>>> > >>
>>> > >>                    mutation.getValueMutation().getValue()
>>> > >>
>>> > >>                )
>>> > >>
>>> > >>                .build()
>>> > >>
>>> > >>        );
>>> > >>
>>> > >>  localHistStream.to(
>>> > >>
>>> > >>            localHistTopic,
>>> > >>
>>> > >>            Produced.with(new Serdes.StringSerde(), new
>>> > NodeStateSerde<>())
>>> > >>
>>> > >>        );
>>> > >>
>>> > >> ----------------------------
>>> > >>
>>> > >> which is pure stateless, committing will not touch on an state
>>> > directory at
>>> > >> all. Hence committing only involves committing offsets to Kafka.
>>> > >>
>>> > >>
>>> > >> Guozhang
>>> > >>
>>> > >>
>>> > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com>
>>> wrote:
>>> > >>>
>>> > >>> I was suspecting that too, but I also noticed the spike is not
>>> spaced
>>> > >>> around 10s. to further prove it. I put kafka data directory in a
>>> memory
>>> > >>> based directory.  it still has such latency spikes.  I am going to
>>> test
>>> > >> it
>>> > >>> on a single broker, single partition env.  will report back soon.
>>> > >>>
>>> > >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com>
>>> > >> wrote:
>>> > >>>
>>> > >>>> Hello Nan,
>>> > >>>>
>>> > >>>> Thanks for the detailed information you shared. When Kafka
>>> Streams is
>>> > >>>> normally running, no rebalances should be triggered unless some
>>> of the
>>> > >>>> instances (in your case, docker containers) have soft failures.
>>> > >>>>
>>> > >>>> I suspect the latency spike is due to the commit intervals:
>>> streams
>>> > >> will
>>> > >>>> try to commit its offset at a regular paces, which may increase
>>> > >> latency.
>>> > >>> It
>>> > >>>> is controlled by the "commit.interval.ms" config value. I saw
>>> that in
>>> > >>> your
>>> > >>>> original email you've set it to 10 * 1000 (10 seconds). Is that
>>> > aligned
>>> > >>>> with the frequency you observe latency spikes?
>>> > >>>>
>>> > >>>>
>>> > >>>> Guozhang
>>> > >>>>
>>> > >>>>
>>> > >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com>
>>> > wrote:
>>> > >>>>>
>>> > >>>>> did more test and and make the test case simple.
>>> > >>>>> all the setup now is a single physical machine. running 3 docker
>>> > >>>> instance.
>>> > >>>>> a1, a2, a3
>>> > >>>>>
>>> > >>>>> kafka + zookeeper running on all of those docker containers.
>>> > >>>>> producer running on a1, send a single key,  update speed 2000
>>> > >>> message/s,
>>> > >>>>> each message is 10K size.
>>> > >>>>> 3 consumer(different group)  are running. one on each docker.
>>> > >>>>> all topics are pre-created.
>>> > >>>>> in startup, I do see some latency greater than 100ms, which is
>>> fine.
>>> > >>> and
>>> > >>>>> then everything is good. latency is low and consumer don't see
>>> > >> anything
>>> > >>>>> over 100ms for a while.
>>> > >>>>> then I see a few messages have latency over 100ms. then back to
>>> > >> normal,
>>> > >>>>> then happen again..... do seems like gc problem. but I check the
>>> gc
>>> > >>>> log.  I
>>> > >>>>> don't think it can cause over 100ms. (both are G1 collector)
>>> > >>>>>
>>> > >>>>> after the stream stable running( exclude the startup), the first
>>> > >>> message
>>> > >>>>> over 100ms take 179ms  and the gc ( it has a 30ms pause, but
>>> should
>>> > >> not
>>> > >>>>> cause a 179ms end to end).
>>> > >>>>>
>>> > >>>>> FROM APP
>>> > >>>>>
>>> > >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
>>> > >>>>> 3184739K->84018K(5947904K), 0.0093730 secs]
>>> > >>>>>
>>> > >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
>>> > >>>>> 3184690K->84280K(6053888K), 0.0087473 secs]
>>> > >>>>>
>>> > >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
>>> > >>>>> 3301176K->84342K(6061056K), 0.0127339 secs]
>>> > >>>>>
>>> > >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
>>> > >>>>> 3301238K->84624K(6143488K), 0.0140844 secs]
>>> > >>>>>
>>> > >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
>>> > >>>>> 3386000K->89949K(6144000K), 0.0108118 secs]
>>> > >>>>>
>>> > >>>>>
>>> > >>>>>
>>> > >>>>> kafka a1
>>> > >>>>>
>>> > >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation
>>> > >> Pause)
>>> > >>>>> (young), 0.0214200 secs]
>>> > >>>>>
>>> > >>>>>   [Parallel Time: 17.2 ms, GC Workers: 8]
>>> > >>>>>
>>> > >>>>>      [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
>>> > >>>>> 7982673.8, Diff: 16.3]
>>> > >>>>>
>>> > >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff:
>>> > >> 1.5,
>>> > >>>>> Sum: 1.5]
>>> > >>>>>
>>> > >>>>>      [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5,
>>> Sum:
>>> > >>> 8.4]
>>> > >>>>>
>>> > >>>>>         [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13,
>>> > >> Sum:
>>> > >>>> 37]
>>> > >>>>>
>>> > >>>>>      [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum:
>>> > >> 7.1]
>>> > >>>>>
>>> > >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0,
>>> Diff:
>>> > >>> 0.0,
>>> > >>>>> Sum: 0.0]
>>> > >>>>>
>>> > >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5,
>>> > >> Sum:
>>> > >>>>> 36.5]
>>> > >>>>>
>>> > >>>>>      [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9,
>>> > >> Sum:
>>> > >>>> 2.9]
>>> > >>>>>
>>> > >>>>>         [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff:
>>> 24,
>>> > >>>> Sum:
>>> > >>>>> 83]
>>> > >>>>>
>>> > >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>>> 0.0,
>>> > >>>> Sum:
>>> > >>>>> 0.1]
>>> > >>>>>
>>> > >>>>>      [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff:
>>> > >> 16.2,
>>> > >>>>> Sum: 56.5]
>>> > >>>>>
>>> > >>>>>      [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max:
>>> > >>>> 7982674.5,
>>> > >>>>> Diff: 0.6]
>>> > >>>>>
>>> > >>>>>   [Code Root Fixup: 0.0 ms]
>>> > >>>>>
>>> > >>>>>   [Code Root Purge: 0.0 ms]
>>> > >>>>>
>>> > >>>>>   [Clear CT: 1.0 ms]
>>> > >>>>>
>>> > >>>>>   [Other: 3.2 ms]
>>> > >>>>>
>>> > >>>>>      [Choose CSet: 0.0 ms]
>>> > >>>>>
>>> > >>>>>      [Ref Proc: 1.9 ms]
>>> > >>>>>
>>> > >>>>>      [Ref Enq: 0.0 ms]
>>> > >>>>>
>>> > >>>>>      [Redirty Cards: 0.8 ms]
>>> > >>>>>
>>> > >>>>>      [Humongous Register: 0.1 ms]
>>> > >>>>>
>>> > >>>>>      [Humongous Reclaim: 0.0 ms]
>>> > >>>>>
>>> > >>>>>      [Free CSet: 0.2 ms]
>>> > >>>>>
>>> > >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K
>>> Heap:
>>> > >>>>> 265.5M(1024.0M)->217.9M(1024.0M)]
>>> > >>>>>
>>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
>>> > >>>>>
>>> > >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation
>>> > >> Pause)
>>> > >>>>> (young), 0.0310004 secs]
>>> > >>>>>
>>> > >>>>>   [Parallel Time: 24.4 ms, GC Workers: 8]
>>> > >>>>>
>>> > >>>>>      [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max:
>>> > >>>>> 7984444.7, Diff: 18.6]
>>> > >>>>>
>>> > >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff:
>>> > >> 1.9,
>>> > >>>>> Sum: 2.0]
>>> > >>>>>
>>> > >>>>>      [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8,
>>> > >> Sum:
>>> > >>>>> 32.9]
>>> > >>>>>
>>> > >>>>>         [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25,
>>> > >> Sum:
>>> > >>>> 43]
>>> > >>>>>
>>> > >>>>>      [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2,
>>> Sum:
>>> > >>>> 25.5]
>>> > >>>>>
>>> > >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0,
>>> Diff:
>>> > >>> 0.0,
>>> > >>>>> Sum: 0.0]
>>> > >>>>>
>>> > >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9,
>>> > >> Sum:
>>> > >>>>> 32.7]
>>> > >>>>>
>>> > >>>>>      [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6,
>>> > >> Sum:
>>> > >>>> 6.8]
>>> > >>>>>
>>> > >>>>>         [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff:
>>> 10,
>>> > >>> Sum:
>>> > >>>>> 43]
>>> > >>>>>
>>> > >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>>> 0.0,
>>> > >>>> Sum:
>>> > >>>>> 0.1]
>>> > >>>>>
>>> > >>>>>      [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff:
>>> > >>> 19.1,
>>> > >>>>> Sum: 100.1]
>>> > >>>>>
>>> > >>>>>      [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max:
>>> > >>>> 7984449.9,
>>> > >>>>> Diff: 0.8]
>>> > >>>>>
>>> > >>>>>   [Code Root Fixup: 0.0 ms]
>>> > >>>>>
>>> > >>>>>   [Code Root Purge: 0.0 ms]
>>> > >>>>>
>>> > >>>>>   [Clear CT: 1.1 ms]
>>> > >>>>>
>>> > >>>>>   [Other: 5.5 ms]
>>> > >>>>>
>>> > >>>>>      [Choose CSet: 0.0 ms]
>>> > >>>>>
>>> > >>>>>      [Ref Proc: 2.2 ms]
>>> > >>>>>
>>> > >>>>>      [Ref Enq: 0.0 ms]
>>> > >>>>>
>>> > >>>>>      [Redirty Cards: 2.8 ms]
>>> > >>>>>
>>> > >>>>>      [Humongous Register: 0.1 ms]
>>> > >>>>>
>>> > >>>>>      [Humongous Reclaim: 0.0 ms]
>>> > >>>>>
>>> > >>>>>      [Free CSet: 0.1 ms]
>>> > >>>>>
>>> > >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K
>>> Heap:
>>> > >>>>> 265.9M(1024.0M)->218.4M(1024.0M)]
>>> > >>>>>
>>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
>>> > >>>>>
>>> > >>>>>
>>> > >>>>> so when kafka stream running, is there any trying to rebalance?
>>> > >> either
>>> > >>>>> broker rebalance or client rebalance?
>>> > >>>>> any kind of test to see what cause the trouble?
>>> > >>>>>
>>> > >>>>> Thanks,
>>> > >>>>> Nan
>>> > >>>>>
>>> > >>>>>
>>> > >>>>>
>>> > >>>>>
>>> > >>>>>
>>> > >>>>>
>>> > >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <
>>> wangg...@gmail.com>
>>> > >>>> wrote:
>>> > >>>>>
>>> > >>>>>> Okay, so you're measuring end-to-end time from producer ->
>>> broker
>>> > >> ->
>>> > >>>>>> streams' consumer client, there are multiple phases that can
>>> > >>> contribute
>>> > >>>>> to
>>> > >>>>>> the 100ms latency, and I cannot tell if stream's consumer phase
>>> is
>>> > >>> the
>>> > >>>>>> major contributor. For example, if the topic was not created
>>> > >> before,
>>> > >>>> then
>>> > >>>>>> when the broker first received a produce request it may need to
>>> > >>> create
>>> > >>>>> the
>>> > >>>>>> topic, which involves multiple steps including writes to ZK
>>> which
>>> > >>> could
>>> > >>>>>> take time.
>>> > >>>>>>
>>> > >>>>>> There are some confusions from your description: you mentioned
>>> > >> "Kafka
>>> > >>>>>> cluster is already up and running", but I think you are
>>> referring
>>> > >> to
>>> > >>>>> "Kafka
>>> > >>>>>> Streams application instances are already up and running",
>>> right?
>>> > >>> Since
>>> > >>>>>> only the latter has rebalance process, while the Kafak brokers
>>> do
>>> > >> not
>>> > >>>>>> really have "rebalances" except balancing load by migrating
>>> > >>> partitions.
>>> > >>>>>>
>>> > >>>>>> Guozhang
>>> > >>>>>>
>>> > >>>>>>
>>> > >>>>>>
>>> > >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com>
>>> > >> wrote:
>>> > >>>>>>
>>> > >>>>>>> right, so my kafka cluster is already up and running for a
>>> while,
>>> > >>>> and I
>>> > >>>>>> can
>>> > >>>>>>> see from the log all broker instance already change from
>>> > >> rebalance
>>> > >>> to
>>> > >>>>>>> running.
>>> > >>>>>>>
>>> > >>>>>>> I did a another test.
>>> > >>>>>>> from producer, right before the message get send to the
>>> broker, I
>>> > >>>> put a
>>> > >>>>>>> timestamp in the message. and from the consumer side which is
>>> > >> after
>>> > >>>>>> stream
>>> > >>>>>>> processing, I compare this timestamp with current time. I can
>>> see
>>> > >>>> some
>>> > >>>>>>> message processing time is above 100ms on some real powerful
>>> > >>>> hardware.
>>> > >>>>>> and
>>> > >>>>>>> from my application gc, all the gc time is below 1ms, kafka gc
>>> > >> only
>>> > >>>>>> happen
>>> > >>>>>>> once and below 1ms too.
>>> > >>>>>>>
>>> > >>>>>>> very puzzled. is there any communication to zookeeper, if not
>>> get
>>> > >>>>>> response,
>>> > >>>>>>> will cause the broker to pause? I don't think that's the case
>>> but
>>> > >>> at
>>> > >>>>> this
>>> > >>>>>>> time don't know what else can be suspected.
>>> > >>>>>>>
>>> > >>>>>>> Nan
>>> > >>>>>>>
>>> > >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <
>>> > >> wangg...@gmail.com>
>>> > >>>>>> wrote:
>>> > >>>>>>>
>>> > >>>>>>>> Hello Nan,
>>> > >>>>>>>>
>>> > >>>>>>>> Note that Streams may need some time to rebalance and assign
>>> > >>> tasks
>>> > >>>>> even
>>> > >>>>>>> if
>>> > >>>>>>>> you only starts with one instance.
>>> > >>>>>>>>
>>> > >>>>>>>> I'd suggest you register your state listener in Kafka Streams
>>> > >> via
>>> > >>>>>>>> KafkaStreams#setStateListener, and your customized
>>> > >> StateListener
>>> > >>>>> should
>>> > >>>>>>>> record when the state transits from REBALANCING to RUNNING
>>> > >> since
>>> > >>>> only
>>> > >>>>>>> after
>>> > >>>>>>>> that the streams client will start to process the first
>>> record.
>>> > >>>>>>>>
>>> > >>>>>>>>
>>> > >>>>>>>> Guozhang
>>> > >>>>>>>>
>>> > >>>>>>>>
>>> > >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com>
>>> > >>>> wrote:
>>> > >>>>>>>>
>>> > >>>>>>>>> thanks, which JMX properties indicate  "processing latency
>>> > >>>>> spikes"  /
>>> > >>>>>>>>> "throughput"
>>> > >>>>>>>>>
>>> > >>>>>>>>>
>>> > >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
>>> > >>>>>> matth...@confluent.io
>>> > >>>>>>>>
>>> > >>>>>>>>> wrote:
>>> > >>>>>>>>>
>>> > >>>>>>>>>> I cannot spot any obvious reasons.
>>> > >>>>>>>>>>
>>> > >>>>>>>>>> As you consume from the result topic for verification, we
>>> > >>>> should
>>> > >>>>>>> verify
>>> > >>>>>>>>>> that the latency spikes original on write and not on read:
>>> > >>> you
>>> > >>>>>> might
>>> > >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if
>>> > >>>>>>> processing
>>> > >>>>>>>>>> latency spikes or throughput drops.
>>> > >>>>>>>>>>
>>> > >>>>>>>>>> Also watch for GC pauses in the JVM.
>>> > >>>>>>>>>>
>>> > >>>>>>>>>> Hope this helps.
>>> > >>>>>>>>>>
>>> > >>>>>>>>>>
>>> > >>>>>>>>>> -Matthias
>>> > >>>>>>>>>>
>>> > >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote:
>>> > >>>>>>>>>>> btw, I am using version 0.10.2.0
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <
>>> > >>> nanxu1...@gmail.com>
>>> > >>>>>>> wrote:
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>>> I am working on a kafka stream app, and see huge latency
>>> > >>>>>> variance,
>>> > >>>>>>>>>>>> wondering what can cause this?
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> the processing is very simple and don't have state,
>>> > >>>> linger.ms
>>> > >>>>>>>> already
>>> > >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and
>>> > >>>>> published
>>> > >>>>>>> as
>>> > >>>>>>>>> 2000
>>> > >>>>>>>>>>>> messages/s, network is 10G.  using a regular consumer
>>> > >>> watch
>>> > >>>>> the
>>> > >>>>>>>>>>>> localHistTopic  topic and just every 2000 message print
>>> > >>> out
>>> > >>>> a
>>> > >>>>>>>> counter,
>>> > >>>>>>>>>> it
>>> > >>>>>>>>>>>> usually every second I get a count 2000 as the publish
>>> > >>>> speed,
>>> > >>>>>> but
>>> > >>>>>>>>>> sometime
>>> > >>>>>>>>>>>> I see it stall for 3 or more seconds and then print out
>>> > >> a
>>> > >>>> few
>>> > >>>>>>> count.
>>> > >>>>>>>>>> like
>>> > >>>>>>>>>>>> cpu is paused during that time or message being
>>> > >>> cache/batch
>>> > >>>>> then
>>> > >>>>>>>>>> processed.
>>> > >>>>>>>>>>>> any suggestion?
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>  final Properties streamsConfiguration = new
>>> > >>> Properties();
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> > >>>>>>>>>>>> applicationId);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_
>>> > >>>>> CONFIG,
>>> > >>>>>>>>>> clientId);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
>>> > >>>>>>>>> SERVERS_CONFIG,
>>> > >>>>>>>>>>>> bootstrapServers);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
>>> > >>>>>>>>> SERDE_CLASS_CONFIG,
>>> > >>>>>>>>>>>> Serdes.String()
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            .getClass().getName());
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>> > >>>>> MS_CONFIG,
>>> > >>>>>>>>>>>> 10 * 1000);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> //
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>
>>> > >>>>>>>>
>>> > >>>>>>
>>> > >>>>
>>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
>>> > BUFFERING_CONFIG,
>>> > >>>>>>>>> 0);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
>>> > >>>>> 335544320);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
>>> > >>>>>>> REQUESTS_PER_CONNECTION,
>>> > >>>>>>>>> 30);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.consumerPrefix(
>>> > >>>>>>>>>>>>            ConsumerConfig.MAX_PARTITION_
>>> > >>>>> FETCH_BYTES_CONFIG),20
>>> > >>>>>> *
>>> > >>>>>>>>> 1024 *
>>> > >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> //
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>
>>> > >>>>>>>>
>>> > >>>>>>
>>> > >>>>
>>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
>>> > BUFFERING_CONFIG,
>>> > >>>>>>>>> 0);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
>>> > >>>>> 335544320);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
>>> > >>>>>>> REQUESTS_PER_CONNECTION,
>>> > >>>>>>>>> 30);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streamsConfiguration.put(
>>> > >>>>> StreamsConfig.consumerPrefix(
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            ConsumerConfig. MAX_PARTITION_FETCH_BYTES_
>>> > >>> CONFIG
>>> > >>>>> ,
>>> > >>>>>>> 20 *
>>> > >>>>>>>>>> 1024 *
>>> > >>>>>>>>>>>> 1024);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder();
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream =
>>> > >>>>>>>>> builder.stream(
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            localDeltaTopic,
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            Consumed.with(
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                new Serdes.StringSerde(),
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                new NodeMutationSerde<>()
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            )
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        );
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>  KStream<String, NodeState> localHistStream =
>>> > >>>>>>>>>> localDeltaStream.mapValues(
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            (mutation) -> NodeState
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                .newBuilder()
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                .setMeta(
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                    mutation.getMetaMutation().getMeta()
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                )
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                .setValue(
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                    mutation.getValueMutation().
>>> > >>> getValue()
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                )
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>                .build()
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        );
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>  localHistStream.to(
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            localHistTopic,
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>            Produced.with(new Serdes.StringSerde(), new
>>> > >>>>>>>>>> NodeStateSerde<>())
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        );
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> streams = new KafkaStreams(builder.build(),
>>> > >>>>>>> streamsConfiguration);
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>        streams.cleanUp();
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>> streams.start();
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>>
>>> > >>>>>>>>>>>
>>> > >>>>>>>>>>
>>> > >>>>>>>>>>
>>> > >>>>>>>>>
>>> > >>>>>>>>
>>> > >>>>>>>>
>>> > >>>>>>>>
>>> > >>>>>>>> --
>>> > >>>>>>>> -- Guozhang
>>> > >>>>>>>>
>>> > >>>>>>>
>>> > >>>>>>
>>> > >>>>>>
>>> > >>>>>>
>>> > >>>>>> --
>>> > >>>>>> -- Guozhang
>>> > >>>>>>
>>> > >>>>>
>>> > >>>>
>>> > >>>>
>>> > >>>>
>>> > >>>> --
>>> > >>>> -- Guozhang
>>> > >>>>
>>> > >>>
>>> > >>
>>> > >>
>>> > >>
>>> > >> --
>>> > >> -- Guozhang
>>> > >>
>>> >
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>

Reply via email to