so I did upgrade to 2.0.0 and still seeing the same result. below is the
program I am using.  I am running everything on a single server. (centos 7,
24 core, 32G ram , 1 broker, 1 zookeeper, single harddrive), I understand
the single hard drive is less ideal. but still don't expect it can over 3
seconds.


case 1.
I create 1 parittions for input and  1 partition for output. message size
10K
producer give parameter  (3600, 1000, 2)   // 2 message per 1000 micro
second for 3600 seconds, that translate to 2,000 message/s, I still see
latency, sometime can reach to 3 seconds.

case 2
50 partitions for input, and 50 partitions for output. message size 10K
producer give parameter  (3600, 1000, 20)   // 20 message per 1000 micro
second for 3600 seconds, that translate to 20,000 message/s,latency not
only high, and happen more often.


Any suggestion is appreciated. target is per partition handle 1,000 --
2,000 message/s and all latency lower than 100ms.

====build.gradle======
plugins {
    id 'application'
    id 'java'
}
group 'com.bofa'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
mainClassName="main.StreamApp"

repositories {
    mavenCentral()
}

dependencies {
    compile group: 'org.apache.kafka', name: 'kafka-clients', version:
'2.0.0'
    compile group: "org.apache.kafka", name: "kafka-streams", version:
"2.0.0"
    compile group: 'io.dropwizard.metrics', name: 'metrics-core',
version:'3.2.6'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

========producer========
package main;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import Util.BusyTimer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleProducer {
    public static void main(String[] args) {
final int time =Integer.valueOf(args[0]);
final long interval = Integer.valueOf(args[1]);
final int batch =Integer.valueOf(args[2]);
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG,
"kafka-perf-test-producer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);

        KafkaProducer<String,String> kafkaProducer = new
KafkaProducer(props);

        StringBuffer buffer = new StringBuffer();
        for(int i=0; i<10240; i++) {
            buffer.append('a');
        }
        String value = buffer.toString();

        final long speed = 1000000/interval;
        Runnable task = new Runnable() {
            int sendNum=0;
            @Override
            public void run() {

                for(int i=0; i<batch; i++) {
                    ProducerRecord<String, String> record = new
ProducerRecord<>("input",  System.nanoTime() + "-" + value);
                    kafkaProducer.send(record);
                    sendNum++;
                }

                if(sendNum % (speed * batch) == 0){
                    System.out.println(System.currentTimeMillis() + " : " +
sendNum);
                }
            }
        };

        BusyTimer timer = new BusyTimer(interval,time, task);
        timer.spaceMessageWithInterval();
    }
}


============kafka stream=============
package main;

import java.util.Properties;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

public class StreamApp {
    public static void main(String[] args) {
        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"simple-stream");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
"simple_stream_1");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String()
            .getClass().getName());
        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
            + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);
        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
            + ProducerConfig.LINGER_MS_CONFIG,"5");
        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,2);


        StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, String> inputStream = builder.stream(
            "input",
            Consumed.with(
                new Serdes.StringSerde(),
                new Serdes.StringSerde()
            )
        );

        inputStream.to(
            "output",
            Produced.with(new Serdes.StringSerde(), new
Serdes.StringSerde())
        );

        KafkaStreams streams = new KafkaStreams(builder.build(),
streamsConfiguration);
        streams.start();
    }
}

=============consumer============================
package main;

import java.util.Collections;
import java.util.Properties;

import com.codahale.metrics.Reservoir;
import com.codahale.metrics.UniformReservoir;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsumer {
    public static void main(String[] args) {
int expectedSpeed = args[0];
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
"kafka-perf-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

        KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Collections.singletonList("output"));

        consumer.poll(0);
        int recNum=0;

        Reservoir totalRes = new UniformReservoir();

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for(ConsumerRecord<String,String> record : records){
                long sendTime = Long.valueOf(record.value().split("-")[0]);
                long takeTime = System.nanoTime() - sendTime;
                if(recNum> 20000) {
                    totalRes.update(takeTime);
                }
                recNum++;

                if(recNum % expectedSpeed == 0){
                    System.out.println("==============="+ recNum +
"============");
                    System.out.println("  mean: " +
totalRes.getSnapshot().getMean()/1000000);
                    System.out.println("  75%: " +
totalRes.getSnapshot().get75thPercentile()/1000000);
                    System.out.println("  99%: " +
totalRes.getSnapshot().get99thPercentile()/1000000);
                    System.out.println("  99.9%: " +
totalRes.getSnapshot().get999thPercentile()/1000000);
                    System.out.println("  Max: " +
totalRes.getSnapshot().getMax()/1000000);

System.out.println("========================================");
                    totalRes = new UniformReservoir();
                }
            };
        }
    }
}

==========busy timer=====================
//idea is space the message at a fixed time.(as thread.sleep, but sleep is
less accurate)
package Util;

import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class BusyTimer {
    long interval;
    long duration;
    ArrayList<Long> pubTime;
    ExecutorService ex = Executors.newSingleThreadExecutor();
    Runnable task;


    public BusyTimer(long microInterval, long exDurationInSeconds, Runnable
task){
        pubTime = new ArrayList<Long>((int)(exDurationInSeconds * 1000 *
1000 / microInterval+1));

        this.interval = microInterval * 1000;
        this.duration = exDurationInSeconds * 1000000000;
        this.task = task;

    }

    private void busywaitUntil(long nano){
        while(System.nanoTime() < nano){

        }
    }

    public void spaceMessageWithInterval(){
        int i =0 ;
        long baseTime = System.nanoTime();
        long doneTime = baseTime + duration;
        while(true) {
            task.run();
            pubTime.add(System.nanoTime());
            long targetTime = System.nanoTime() + interval;
            if(System.nanoTime() > doneTime ){
                break;
            }
            busywaitUntil(targetTime);
        }
    }
}



On Fri, Aug 24, 2018 at 3:37 PM Nan Xu <nanxu1...@gmail.com> wrote:

> Looks really promising but after upgrade, still show the same result. I
> will post the program soon. Maybe you can see where the problem could be.
>
> Nan
>
> On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hello Nan,
>>
>> Kafka does not tie up the processing thread to do disk flushing. However,
>> since you are on an older version of Kafka I suspect you're bumping into
>> some old issues that have been resolved in later versions. e.g.
>>
>> https://issues.apache.org/jira/browse/KAFKA-4614
>>
>> I'd suggest you upgrading to latest version (2.0.0) and try again to see
>> if
>> you observe the same pattern.
>>
>> Guozhang
>>
>> On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni <
>> sbpothin...@gmail.com> wrote:
>>
>> > I will wait for the expert’s opinion:
>> >
>> > Did the Transparent Huge Pages(THP) disabled on the broker machine?
>> it’s a
>> > Linux kernel parameter.
>> >
>> > -Sudhir
>> >
>> > > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote:
>> > >
>> > > I think I found where the problem is, how to solve and why, still not
>> > sure.
>> > >
>> > > it related to disk (maybe flushing?). I did a single machine, single
>> > node,
>> > > single topic and single partition setup.  producer pub as 2000
>> message/s,
>> > > 10K size message size. and single key.
>> > >
>> > > when I save kafka log to the  memory based partition, I don't see a
>> > latency
>> > > over 100ms. top around 70ms.
>> > > when I save to a ssd hard drive. I do see latency spike, sometime over
>> > 1s.
>> > >
>> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has
>> impact,
>> > > but only to make thing worse... need suggestion.
>> > >
>> > > I think log flushing is totally async and done by OS in the default
>> > > setting. does kafka has to wait when flushing data to disk?
>> > >
>> > > Thanks,
>> > > Nan
>> > >
>> > >
>> > >
>> > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> > >>
>> > >> Given your application code:
>> > >>
>> > >> ----------------------------
>> > >>
>> > >> final KStream<String, NodeMutation> localDeltaStream =
>> builder.stream(
>> > >>
>> > >>            localDeltaTopic,
>> > >>
>> > >>            Consumed.with(
>> > >>
>> > >>                new Serdes.StringSerde(),
>> > >>
>> > >>                new NodeMutationSerde<>()
>> > >>
>> > >>            )
>> > >>
>> > >>        );
>> > >>
>> > >>  KStream<String, NodeState> localHistStream =
>> > localDeltaStream.mapValues(
>> > >>
>> > >>            (mutation) -> NodeState
>> > >>
>> > >>                .newBuilder()
>> > >>
>> > >>                .setMeta(
>> > >>
>> > >>                    mutation.getMetaMutation().getMeta()
>> > >>
>> > >>                )
>> > >>
>> > >>                .setValue(
>> > >>
>> > >>                    mutation.getValueMutation().getValue()
>> > >>
>> > >>                )
>> > >>
>> > >>                .build()
>> > >>
>> > >>        );
>> > >>
>> > >>  localHistStream.to(
>> > >>
>> > >>            localHistTopic,
>> > >>
>> > >>            Produced.with(new Serdes.StringSerde(), new
>> > NodeStateSerde<>())
>> > >>
>> > >>        );
>> > >>
>> > >> ----------------------------
>> > >>
>> > >> which is pure stateless, committing will not touch on an state
>> > directory at
>> > >> all. Hence committing only involves committing offsets to Kafka.
>> > >>
>> > >>
>> > >> Guozhang
>> > >>
>> > >>
>> > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com>
>> wrote:
>> > >>>
>> > >>> I was suspecting that too, but I also noticed the spike is not
>> spaced
>> > >>> around 10s. to further prove it. I put kafka data directory in a
>> memory
>> > >>> based directory.  it still has such latency spikes.  I am going to
>> test
>> > >> it
>> > >>> on a single broker, single partition env.  will report back soon.
>> > >>>
>> > >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com>
>> > >> wrote:
>> > >>>
>> > >>>> Hello Nan,
>> > >>>>
>> > >>>> Thanks for the detailed information you shared. When Kafka Streams
>> is
>> > >>>> normally running, no rebalances should be triggered unless some of
>> the
>> > >>>> instances (in your case, docker containers) have soft failures.
>> > >>>>
>> > >>>> I suspect the latency spike is due to the commit intervals: streams
>> > >> will
>> > >>>> try to commit its offset at a regular paces, which may increase
>> > >> latency.
>> > >>> It
>> > >>>> is controlled by the "commit.interval.ms" config value. I saw
>> that in
>> > >>> your
>> > >>>> original email you've set it to 10 * 1000 (10 seconds). Is that
>> > aligned
>> > >>>> with the frequency you observe latency spikes?
>> > >>>>
>> > >>>>
>> > >>>> Guozhang
>> > >>>>
>> > >>>>
>> > >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com>
>> > wrote:
>> > >>>>>
>> > >>>>> did more test and and make the test case simple.
>> > >>>>> all the setup now is a single physical machine. running 3 docker
>> > >>>> instance.
>> > >>>>> a1, a2, a3
>> > >>>>>
>> > >>>>> kafka + zookeeper running on all of those docker containers.
>> > >>>>> producer running on a1, send a single key,  update speed 2000
>> > >>> message/s,
>> > >>>>> each message is 10K size.
>> > >>>>> 3 consumer(different group)  are running. one on each docker.
>> > >>>>> all topics are pre-created.
>> > >>>>> in startup, I do see some latency greater than 100ms, which is
>> fine.
>> > >>> and
>> > >>>>> then everything is good. latency is low and consumer don't see
>> > >> anything
>> > >>>>> over 100ms for a while.
>> > >>>>> then I see a few messages have latency over 100ms. then back to
>> > >> normal,
>> > >>>>> then happen again..... do seems like gc problem. but I check the
>> gc
>> > >>>> log.  I
>> > >>>>> don't think it can cause over 100ms. (both are G1 collector)
>> > >>>>>
>> > >>>>> after the stream stable running( exclude the startup), the first
>> > >>> message
>> > >>>>> over 100ms take 179ms  and the gc ( it has a 30ms pause, but
>> should
>> > >> not
>> > >>>>> cause a 179ms end to end).
>> > >>>>>
>> > >>>>> FROM APP
>> > >>>>>
>> > >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
>> > >>>>> 3184739K->84018K(5947904K), 0.0093730 secs]
>> > >>>>>
>> > >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
>> > >>>>> 3184690K->84280K(6053888K), 0.0087473 secs]
>> > >>>>>
>> > >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
>> > >>>>> 3301176K->84342K(6061056K), 0.0127339 secs]
>> > >>>>>
>> > >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
>> > >>>>> 3301238K->84624K(6143488K), 0.0140844 secs]
>> > >>>>>
>> > >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
>> > >>>>> 3386000K->89949K(6144000K), 0.0108118 secs]
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> kafka a1
>> > >>>>>
>> > >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation
>> > >> Pause)
>> > >>>>> (young), 0.0214200 secs]
>> > >>>>>
>> > >>>>>   [Parallel Time: 17.2 ms, GC Workers: 8]
>> > >>>>>
>> > >>>>>      [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
>> > >>>>> 7982673.8, Diff: 16.3]
>> > >>>>>
>> > >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff:
>> > >> 1.5,
>> > >>>>> Sum: 1.5]
>> > >>>>>
>> > >>>>>      [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5,
>> Sum:
>> > >>> 8.4]
>> > >>>>>
>> > >>>>>         [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13,
>> > >> Sum:
>> > >>>> 37]
>> > >>>>>
>> > >>>>>      [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum:
>> > >> 7.1]
>> > >>>>>
>> > >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>> > >>> 0.0,
>> > >>>>> Sum: 0.0]
>> > >>>>>
>> > >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5,
>> > >> Sum:
>> > >>>>> 36.5]
>> > >>>>>
>> > >>>>>      [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9,
>> > >> Sum:
>> > >>>> 2.9]
>> > >>>>>
>> > >>>>>         [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff:
>> 24,
>> > >>>> Sum:
>> > >>>>> 83]
>> > >>>>>
>> > >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>> 0.0,
>> > >>>> Sum:
>> > >>>>> 0.1]
>> > >>>>>
>> > >>>>>      [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff:
>> > >> 16.2,
>> > >>>>> Sum: 56.5]
>> > >>>>>
>> > >>>>>      [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max:
>> > >>>> 7982674.5,
>> > >>>>> Diff: 0.6]
>> > >>>>>
>> > >>>>>   [Code Root Fixup: 0.0 ms]
>> > >>>>>
>> > >>>>>   [Code Root Purge: 0.0 ms]
>> > >>>>>
>> > >>>>>   [Clear CT: 1.0 ms]
>> > >>>>>
>> > >>>>>   [Other: 3.2 ms]
>> > >>>>>
>> > >>>>>      [Choose CSet: 0.0 ms]
>> > >>>>>
>> > >>>>>      [Ref Proc: 1.9 ms]
>> > >>>>>
>> > >>>>>      [Ref Enq: 0.0 ms]
>> > >>>>>
>> > >>>>>      [Redirty Cards: 0.8 ms]
>> > >>>>>
>> > >>>>>      [Humongous Register: 0.1 ms]
>> > >>>>>
>> > >>>>>      [Humongous Reclaim: 0.0 ms]
>> > >>>>>
>> > >>>>>      [Free CSet: 0.2 ms]
>> > >>>>>
>> > >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K
>> Heap:
>> > >>>>> 265.5M(1024.0M)->217.9M(1024.0M)]
>> > >>>>>
>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
>> > >>>>>
>> > >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation
>> > >> Pause)
>> > >>>>> (young), 0.0310004 secs]
>> > >>>>>
>> > >>>>>   [Parallel Time: 24.4 ms, GC Workers: 8]
>> > >>>>>
>> > >>>>>      [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max:
>> > >>>>> 7984444.7, Diff: 18.6]
>> > >>>>>
>> > >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff:
>> > >> 1.9,
>> > >>>>> Sum: 2.0]
>> > >>>>>
>> > >>>>>      [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8,
>> > >> Sum:
>> > >>>>> 32.9]
>> > >>>>>
>> > >>>>>         [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25,
>> > >> Sum:
>> > >>>> 43]
>> > >>>>>
>> > >>>>>      [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2,
>> Sum:
>> > >>>> 25.5]
>> > >>>>>
>> > >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>> > >>> 0.0,
>> > >>>>> Sum: 0.0]
>> > >>>>>
>> > >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9,
>> > >> Sum:
>> > >>>>> 32.7]
>> > >>>>>
>> > >>>>>      [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6,
>> > >> Sum:
>> > >>>> 6.8]
>> > >>>>>
>> > >>>>>         [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff:
>> 10,
>> > >>> Sum:
>> > >>>>> 43]
>> > >>>>>
>> > >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>> 0.0,
>> > >>>> Sum:
>> > >>>>> 0.1]
>> > >>>>>
>> > >>>>>      [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff:
>> > >>> 19.1,
>> > >>>>> Sum: 100.1]
>> > >>>>>
>> > >>>>>      [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max:
>> > >>>> 7984449.9,
>> > >>>>> Diff: 0.8]
>> > >>>>>
>> > >>>>>   [Code Root Fixup: 0.0 ms]
>> > >>>>>
>> > >>>>>   [Code Root Purge: 0.0 ms]
>> > >>>>>
>> > >>>>>   [Clear CT: 1.1 ms]
>> > >>>>>
>> > >>>>>   [Other: 5.5 ms]
>> > >>>>>
>> > >>>>>      [Choose CSet: 0.0 ms]
>> > >>>>>
>> > >>>>>      [Ref Proc: 2.2 ms]
>> > >>>>>
>> > >>>>>      [Ref Enq: 0.0 ms]
>> > >>>>>
>> > >>>>>      [Redirty Cards: 2.8 ms]
>> > >>>>>
>> > >>>>>      [Humongous Register: 0.1 ms]
>> > >>>>>
>> > >>>>>      [Humongous Reclaim: 0.0 ms]
>> > >>>>>
>> > >>>>>      [Free CSet: 0.1 ms]
>> > >>>>>
>> > >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K
>> Heap:
>> > >>>>> 265.9M(1024.0M)->218.4M(1024.0M)]
>> > >>>>>
>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
>> > >>>>>
>> > >>>>>
>> > >>>>> so when kafka stream running, is there any trying to rebalance?
>> > >> either
>> > >>>>> broker rebalance or client rebalance?
>> > >>>>> any kind of test to see what cause the trouble?
>> > >>>>>
>> > >>>>> Thanks,
>> > >>>>> Nan
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>>
>> > >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <
>> wangg...@gmail.com>
>> > >>>> wrote:
>> > >>>>>
>> > >>>>>> Okay, so you're measuring end-to-end time from producer -> broker
>> > >> ->
>> > >>>>>> streams' consumer client, there are multiple phases that can
>> > >>> contribute
>> > >>>>> to
>> > >>>>>> the 100ms latency, and I cannot tell if stream's consumer phase
>> is
>> > >>> the
>> > >>>>>> major contributor. For example, if the topic was not created
>> > >> before,
>> > >>>> then
>> > >>>>>> when the broker first received a produce request it may need to
>> > >>> create
>> > >>>>> the
>> > >>>>>> topic, which involves multiple steps including writes to ZK which
>> > >>> could
>> > >>>>>> take time.
>> > >>>>>>
>> > >>>>>> There are some confusions from your description: you mentioned
>> > >> "Kafka
>> > >>>>>> cluster is already up and running", but I think you are referring
>> > >> to
>> > >>>>> "Kafka
>> > >>>>>> Streams application instances are already up and running", right?
>> > >>> Since
>> > >>>>>> only the latter has rebalance process, while the Kafak brokers do
>> > >> not
>> > >>>>>> really have "rebalances" except balancing load by migrating
>> > >>> partitions.
>> > >>>>>>
>> > >>>>>> Guozhang
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com>
>> > >> wrote:
>> > >>>>>>
>> > >>>>>>> right, so my kafka cluster is already up and running for a
>> while,
>> > >>>> and I
>> > >>>>>> can
>> > >>>>>>> see from the log all broker instance already change from
>> > >> rebalance
>> > >>> to
>> > >>>>>>> running.
>> > >>>>>>>
>> > >>>>>>> I did a another test.
>> > >>>>>>> from producer, right before the message get send to the broker,
>> I
>> > >>>> put a
>> > >>>>>>> timestamp in the message. and from the consumer side which is
>> > >> after
>> > >>>>>> stream
>> > >>>>>>> processing, I compare this timestamp with current time. I can
>> see
>> > >>>> some
>> > >>>>>>> message processing time is above 100ms on some real powerful
>> > >>>> hardware.
>> > >>>>>> and
>> > >>>>>>> from my application gc, all the gc time is below 1ms, kafka gc
>> > >> only
>> > >>>>>> happen
>> > >>>>>>> once and below 1ms too.
>> > >>>>>>>
>> > >>>>>>> very puzzled. is there any communication to zookeeper, if not
>> get
>> > >>>>>> response,
>> > >>>>>>> will cause the broker to pause? I don't think that's the case
>> but
>> > >>> at
>> > >>>>> this
>> > >>>>>>> time don't know what else can be suspected.
>> > >>>>>>>
>> > >>>>>>> Nan
>> > >>>>>>>
>> > >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <
>> > >> wangg...@gmail.com>
>> > >>>>>> wrote:
>> > >>>>>>>
>> > >>>>>>>> Hello Nan,
>> > >>>>>>>>
>> > >>>>>>>> Note that Streams may need some time to rebalance and assign
>> > >>> tasks
>> > >>>>> even
>> > >>>>>>> if
>> > >>>>>>>> you only starts with one instance.
>> > >>>>>>>>
>> > >>>>>>>> I'd suggest you register your state listener in Kafka Streams
>> > >> via
>> > >>>>>>>> KafkaStreams#setStateListener, and your customized
>> > >> StateListener
>> > >>>>> should
>> > >>>>>>>> record when the state transits from REBALANCING to RUNNING
>> > >> since
>> > >>>> only
>> > >>>>>>> after
>> > >>>>>>>> that the streams client will start to process the first record.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> Guozhang
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com>
>> > >>>> wrote:
>> > >>>>>>>>
>> > >>>>>>>>> thanks, which JMX properties indicate  "processing latency
>> > >>>>> spikes"  /
>> > >>>>>>>>> "throughput"
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
>> > >>>>>> matth...@confluent.io
>> > >>>>>>>>
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>>> I cannot spot any obvious reasons.
>> > >>>>>>>>>>
>> > >>>>>>>>>> As you consume from the result topic for verification, we
>> > >>>> should
>> > >>>>>>> verify
>> > >>>>>>>>>> that the latency spikes original on write and not on read:
>> > >>> you
>> > >>>>>> might
>> > >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if
>> > >>>>>>> processing
>> > >>>>>>>>>> latency spikes or throughput drops.
>> > >>>>>>>>>>
>> > >>>>>>>>>> Also watch for GC pauses in the JVM.
>> > >>>>>>>>>>
>> > >>>>>>>>>> Hope this helps.
>> > >>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>> -Matthias
>> > >>>>>>>>>>
>> > >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote:
>> > >>>>>>>>>>> btw, I am using version 0.10.2.0
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <
>> > >>> nanxu1...@gmail.com>
>> > >>>>>>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> I am working on a kafka stream app, and see huge latency
>> > >>>>>> variance,
>> > >>>>>>>>>>>> wondering what can cause this?
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> the processing is very simple and don't have state,
>> > >>>> linger.ms
>> > >>>>>>>> already
>> > >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and
>> > >>>>> published
>> > >>>>>>> as
>> > >>>>>>>>> 2000
>> > >>>>>>>>>>>> messages/s, network is 10G.  using a regular consumer
>> > >>> watch
>> > >>>>> the
>> > >>>>>>>>>>>> localHistTopic  topic and just every 2000 message print
>> > >>> out
>> > >>>> a
>> > >>>>>>>> counter,
>> > >>>>>>>>>> it
>> > >>>>>>>>>>>> usually every second I get a count 2000 as the publish
>> > >>>> speed,
>> > >>>>>> but
>> > >>>>>>>>>> sometime
>> > >>>>>>>>>>>> I see it stall for 3 or more seconds and then print out
>> > >> a
>> > >>>> few
>> > >>>>>>> count.
>> > >>>>>>>>>> like
>> > >>>>>>>>>>>> cpu is paused during that time or message being
>> > >>> cache/batch
>> > >>>>> then
>> > >>>>>>>>>> processed.
>> > >>>>>>>>>>>> any suggestion?
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>  final Properties streamsConfiguration = new
>> > >>> Properties();
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> > >>>>>>>>>>>> applicationId);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_
>> > >>>>> CONFIG,
>> > >>>>>>>>>> clientId);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
>> > >>>>>>>>> SERVERS_CONFIG,
>> > >>>>>>>>>>>> bootstrapServers);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
>> > >>>>>>>>> SERDE_CLASS_CONFIG,
>> > >>>>>>>>>>>> Serdes.String()
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            .getClass().getName());
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>> > >>>>> MS_CONFIG,
>> > >>>>>>>>>>>> 10 * 1000);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> //
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>
>> > >>>>
>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
>> > BUFFERING_CONFIG,
>> > >>>>>>>>> 0);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
>> > >>>>> 335544320);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
>> > >>>>>>> REQUESTS_PER_CONNECTION,
>> > >>>>>>>>> 30);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.consumerPrefix(
>> > >>>>>>>>>>>>            ConsumerConfig.MAX_PARTITION_
>> > >>>>> FETCH_BYTES_CONFIG),20
>> > >>>>>> *
>> > >>>>>>>>> 1024 *
>> > >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> //
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>
>> > >>>>
>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
>> > BUFFERING_CONFIG,
>> > >>>>>>>>> 0);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
>> > >>>>> 335544320);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
>> > >>>>>>> REQUESTS_PER_CONNECTION,
>> > >>>>>>>>> 30);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.PRODUCER_PREFIX
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streamsConfiguration.put(
>> > >>>>> StreamsConfig.consumerPrefix(
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            ConsumerConfig. MAX_PARTITION_FETCH_BYTES_
>> > >>> CONFIG
>> > >>>>> ,
>> > >>>>>>> 20 *
>> > >>>>>>>>>> 1024 *
>> > >>>>>>>>>>>> 1024);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder();
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream =
>> > >>>>>>>>> builder.stream(
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            localDeltaTopic,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            Consumed.with(
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                new Serdes.StringSerde(),
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                new NodeMutationSerde<>()
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            )
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        );
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>  KStream<String, NodeState> localHistStream =
>> > >>>>>>>>>> localDeltaStream.mapValues(
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            (mutation) -> NodeState
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                .newBuilder()
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                .setMeta(
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                    mutation.getMetaMutation().getMeta()
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                )
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                .setValue(
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                    mutation.getValueMutation().
>> > >>> getValue()
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                )
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>                .build()
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        );
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>  localHistStream.to(
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            localHistTopic,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>            Produced.with(new Serdes.StringSerde(), new
>> > >>>>>>>>>> NodeStateSerde<>())
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        );
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> streams = new KafkaStreams(builder.build(),
>> > >>>>>>> streamsConfiguration);
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>        streams.cleanUp();
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> streams.start();
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> --
>> > >>>>>>>> -- Guozhang
>> > >>>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> --
>> > >>>>>> -- Guozhang
>> > >>>>>>
>> > >>>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> --
>> > >>>> -- Guozhang
>> > >>>>
>> > >>>
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to