Re: JVM crash when closing persistent store (rocksDB)

2016-10-12 Thread Eno Thereska
Depending on how voting goes, the tentative date is Oct 17th:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1 


Thanks
Eno
> On 12 Oct 2016, at 16:00, Damian Guy  wrote:
> 
> 0.10.1 will release will hopefully be within the next couple of weeks.
> 
> On Wed, 12 Oct 2016 at 15:52 Pierre Coquentin  >
> wrote:
> 
>> Ok it works against the trunk and the branch 0.10.1, both have a dependency
>> to rockdb 4.9.0 vs 4.4.1 for kafka 0.10.0.
>> Do you know when 0.10.1 will be released ?
>> 
>> On Tue, Oct 11, 2016 at 9:39 PM, Pierre Coquentin <
>> pierre.coquen...@gmail.com> wrote:
>> 
>>> Hi,
>>> 
>>> I already tried to store rocks db files somewhere else by specifying the
>>> kafa state dir properties, but no luck, same behavior.
>>> I will try to run with the trunk tomorrow to see if it's stop correctly,
>>> and I will keep you inform. There must be something with my
>> configuration,
>>> because I googled and saw nothing about this problem.
>>> 
>>> On Tue, Oct 11, 2016 at 9:28 PM, Eno Thereska 
>>> wrote:
>>> 
 Hi Pierre,
 
 I tried the exact code on MacOs and am not getting any errors. Could you
 check if all the directories in /tmp where Kafka Streams writes the
>> RocksDb
 files are empty? I'm wondering if there is some bad state left over.
 
 Finally looks like you are running 0.10.0, could you try running trunk
>> to
 see if the problem still exists? I'm running trunk. I know there were a
 couple of RocksDb fixes after 0.10.0
 
 Thanks
 Eno
 
> On 11 Oct 2016, at 16:41, Pierre Coquentin <
>> pierre.coquen...@gmail.com>
 wrote:
> 
> Hi,
> 
> I have a simple test where I create a topology builder with one topic,
 one
> processor using a persistent store, then I create a kafka streams,
>> start
> it, wait a bit, then close. Each time, the jvm crashes (seg fault)
>> when
> flushing the data. Anyone has already met this kind of problem ?
> 
> OS:
> Ubuntu 16.04
> 
> JVM:
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.
 04.1-b14)
> OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> 
> Kafka:
> kafka_2.11-0.10.0.1
> 
> The java code to reproduce the problem:
> 
> public static void main(String[] args) throws InterruptedException {
>   Map configs = new HashMap<>();
>   configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181");
>   configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>   configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>   configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> StringSerde.class.getName());
>   configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> StringSerde.class.getName());
> 
>   TopologyBuilder builder = new TopologyBuilder()
>   .addSource("source", "test")
>   .addProcessor("processor", () -> new Processor String>() {
> 
>   private KeyValueStore kvStore;
> 
>   public void init(ProcessorContext context) {
>   kvStore = (KeyValueStore)
> context.getStateStore("store");
>   }
> 
>   public void process(String key, String value) {
> 
>   }
> 
>   public void close() {
>   kvStore.close();
>   }
> 
>   @Override
>   public void punctuate(long timestamp) {
> 
> 
>   }
>   }, "source")
> .addStateStore(Stores.create("store").withKeys(new
> StringSerde()).withValues(new StringSerde()).persistent().build(),
> "processor");
> 
> 
>   KafkaStreams streams = new KafkaStreams(builder, new
> StreamsConfig(configs));
>   streams.start();
>   TimeUnit.SECONDS.sleep(20);
>   streams.close();
>   }
> 
> 
> There is the log:
> 
> 
> 11.10.2016 17:27:11 [main] INFO
> [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
> replication.factor = 1
> num.standby.replicas = 0
> metric.reporters = []
> commit.interval.ms = 3
> bootstrap.servers = [localhost:9092]
> state.dir = /tmp/kafka-streams
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> state.cleanup.delay.ms = 6
> poll.ms = 100
> zookeeper.connect = localhost:2181
> key.serde = class org.apache.kafka.common.serial
 ization.Serdes$StringSerde
> metrics.sample.window.ms = 3
> buffered.records.per.partition = 1000
> value.serde = class org

Re: JVM crash when closing persistent store (rocksDB)

2016-10-12 Thread Damian Guy
0.10.1 will release will hopefully be within the next couple of weeks.

On Wed, 12 Oct 2016 at 15:52 Pierre Coquentin 
wrote:

> Ok it works against the trunk and the branch 0.10.1, both have a dependency
> to rockdb 4.9.0 vs 4.4.1 for kafka 0.10.0.
> Do you know when 0.10.1 will be released ?
>
> On Tue, Oct 11, 2016 at 9:39 PM, Pierre Coquentin <
> pierre.coquen...@gmail.com> wrote:
>
> > Hi,
> >
> > I already tried to store rocks db files somewhere else by specifying the
> > kafa state dir properties, but no luck, same behavior.
> > I will try to run with the trunk tomorrow to see if it's stop correctly,
> > and I will keep you inform. There must be something with my
> configuration,
> > because I googled and saw nothing about this problem.
> >
> > On Tue, Oct 11, 2016 at 9:28 PM, Eno Thereska 
> > wrote:
> >
> >> Hi Pierre,
> >>
> >> I tried the exact code on MacOs and am not getting any errors. Could you
> >> check if all the directories in /tmp where Kafka Streams writes the
> RocksDb
> >> files are empty? I'm wondering if there is some bad state left over.
> >>
> >> Finally looks like you are running 0.10.0, could you try running trunk
> to
> >> see if the problem still exists? I'm running trunk. I know there were a
> >> couple of RocksDb fixes after 0.10.0
> >>
> >> Thanks
> >> Eno
> >>
> >> > On 11 Oct 2016, at 16:41, Pierre Coquentin <
> pierre.coquen...@gmail.com>
> >> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a simple test where I create a topology builder with one topic,
> >> one
> >> > processor using a persistent store, then I create a kafka streams,
> start
> >> > it, wait a bit, then close. Each time, the jvm crashes (seg fault)
> when
> >> > flushing the data. Anyone has already met this kind of problem ?
> >> >
> >> > OS:
> >> > Ubuntu 16.04
> >> >
> >> > JVM:
> >> > openjdk version "1.8.0_91"
> >> > OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.
> >> 04.1-b14)
> >> > OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> >> >
> >> > Kafka:
> >> > kafka_2.11-0.10.0.1
> >> >
> >> > The java code to reproduce the problem:
> >> >
> >> > public static void main(String[] args) throws InterruptedException {
> >> >Map configs = new HashMap<>();
> >> >configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> >> > "localhost:2181");
> >> >configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >> > "localhost:9092");
> >> >configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> >> >configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> > StringSerde.class.getName());
> >> >configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >> > StringSerde.class.getName());
> >> >
> >> >TopologyBuilder builder = new TopologyBuilder()
> >> >.addSource("source", "test")
> >> >.addProcessor("processor", () -> new Processor >> > String>() {
> >> >
> >> >private KeyValueStore kvStore;
> >> >
> >> >public void init(ProcessorContext context) {
> >> >kvStore = (KeyValueStore)
> >> > context.getStateStore("store");
> >> >}
> >> >
> >> >public void process(String key, String value) {
> >> >
> >> >}
> >> >
> >> >public void close() {
> >> >kvStore.close();
> >> >}
> >> >
> >> >@Override
> >> >public void punctuate(long timestamp) {
> >> >
> >> >
> >> >}
> >> >}, "source")
> >> >  .addStateStore(Stores.create("store").withKeys(new
> >> > StringSerde()).withValues(new StringSerde()).persistent().build(),
> >> > "processor");
> >> >
> >> >
> >> >KafkaStreams streams = new KafkaStreams(builder, new
> >> > StreamsConfig(configs));
> >> >streams.start();
> >> >TimeUnit.SECONDS.sleep(20);
> >> >streams.close();
> >> >}
> >> >
> >> >
> >> > There is the log:
> >> >
> >> >
> >> > 11.10.2016 17:27:11 [main] INFO
> >> > [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
> >> > replication.factor = 1
> >> > num.standby.replicas = 0
> >> > metric.reporters = []
> >> > commit.interval.ms = 3
> >> > bootstrap.servers = [localhost:9092]
> >> > state.dir = /tmp/kafka-streams
> >> > partition.grouper = class
> >> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> >> > state.cleanup.delay.ms = 6
> >> > poll.ms = 100
> >> > zookeeper.connect = localhost:2181
> >> > key.serde = class org.apache.kafka.common.serial
> >> ization.Serdes$StringSerde
> >> > metrics.sample.window.ms = 3
> >> > buffered.records.per.partition = 1000
> >> > value.serde = class org.apache.kafka.common.serial
> >> ization.Serdes$StringSerde
> >> > timestamp.extractor = class
> >> > org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
> >> > num.stream.threads = 1
> >> > metrics.num.samples = 2
> >> >

Re: JVM crash when closing persistent store (rocksDB)

2016-10-12 Thread Pierre Coquentin
Ok it works against the trunk and the branch 0.10.1, both have a dependency
to rockdb 4.9.0 vs 4.4.1 for kafka 0.10.0.
Do you know when 0.10.1 will be released ?

On Tue, Oct 11, 2016 at 9:39 PM, Pierre Coquentin <
pierre.coquen...@gmail.com> wrote:

> Hi,
>
> I already tried to store rocks db files somewhere else by specifying the
> kafa state dir properties, but no luck, same behavior.
> I will try to run with the trunk tomorrow to see if it's stop correctly,
> and I will keep you inform. There must be something with my configuration,
> because I googled and saw nothing about this problem.
>
> On Tue, Oct 11, 2016 at 9:28 PM, Eno Thereska 
> wrote:
>
>> Hi Pierre,
>>
>> I tried the exact code on MacOs and am not getting any errors. Could you
>> check if all the directories in /tmp where Kafka Streams writes the RocksDb
>> files are empty? I'm wondering if there is some bad state left over.
>>
>> Finally looks like you are running 0.10.0, could you try running trunk to
>> see if the problem still exists? I'm running trunk. I know there were a
>> couple of RocksDb fixes after 0.10.0
>>
>> Thanks
>> Eno
>>
>> > On 11 Oct 2016, at 16:41, Pierre Coquentin 
>> wrote:
>> >
>> > Hi,
>> >
>> > I have a simple test where I create a topology builder with one topic,
>> one
>> > processor using a persistent store, then I create a kafka streams, start
>> > it, wait a bit, then close. Each time, the jvm crashes (seg fault) when
>> > flushing the data. Anyone has already met this kind of problem ?
>> >
>> > OS:
>> > Ubuntu 16.04
>> >
>> > JVM:
>> > openjdk version "1.8.0_91"
>> > OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.
>> 04.1-b14)
>> > OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
>> >
>> > Kafka:
>> > kafka_2.11-0.10.0.1
>> >
>> > The java code to reproduce the problem:
>> >
>> > public static void main(String[] args) throws InterruptedException {
>> >Map configs = new HashMap<>();
>> >configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>> > "localhost:2181");
>> >configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> > "localhost:9092");
>> >configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>> >configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > StringSerde.class.getName());
>> >configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > StringSerde.class.getName());
>> >
>> >TopologyBuilder builder = new TopologyBuilder()
>> >.addSource("source", "test")
>> >.addProcessor("processor", () -> new Processor> > String>() {
>> >
>> >private KeyValueStore kvStore;
>> >
>> >public void init(ProcessorContext context) {
>> >kvStore = (KeyValueStore)
>> > context.getStateStore("store");
>> >}
>> >
>> >public void process(String key, String value) {
>> >
>> >}
>> >
>> >public void close() {
>> >kvStore.close();
>> >}
>> >
>> >@Override
>> >public void punctuate(long timestamp) {
>> >
>> >
>> >}
>> >}, "source")
>> >  .addStateStore(Stores.create("store").withKeys(new
>> > StringSerde()).withValues(new StringSerde()).persistent().build(),
>> > "processor");
>> >
>> >
>> >KafkaStreams streams = new KafkaStreams(builder, new
>> > StreamsConfig(configs));
>> >streams.start();
>> >TimeUnit.SECONDS.sleep(20);
>> >streams.close();
>> >}
>> >
>> >
>> > There is the log:
>> >
>> >
>> > 11.10.2016 17:27:11 [main] INFO
>> > [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
>> > replication.factor = 1
>> > num.standby.replicas = 0
>> > metric.reporters = []
>> > commit.interval.ms = 3
>> > bootstrap.servers = [localhost:9092]
>> > state.dir = /tmp/kafka-streams
>> > partition.grouper = class
>> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
>> > state.cleanup.delay.ms = 6
>> > poll.ms = 100
>> > zookeeper.connect = localhost:2181
>> > key.serde = class org.apache.kafka.common.serial
>> ization.Serdes$StringSerde
>> > metrics.sample.window.ms = 3
>> > buffered.records.per.partition = 1000
>> > value.serde = class org.apache.kafka.common.serial
>> ization.Serdes$StringSerde
>> > timestamp.extractor = class
>> > org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
>> > num.stream.threads = 1
>> > metrics.num.samples = 2
>> > application.id = test
>> > client.id =
>> >
>> > 11.10.2016 17:27:11 [main] INFO
>> > [org.apache.kafka.streams.processor.internals.StreamThread:170]
>> Creating
>> > producer client for stream thread [StreamThread-1]
>> > 11.10.2016 17:27:11 [main] INFO
>> > [org.apache.kafka.clients.producer.ProducerConfig:178] ProducerConfig
>> > values:
>> > metric.reporters = []
>> > metadata.max.age.ms = 30
>> > reconnect.backoff.ms = 5

Re: JVM crash when closing persistent store (rocksDB)

2016-10-11 Thread Pierre Coquentin
Hi,

I already tried to store rocks db files somewhere else by specifying the
kafa state dir properties, but no luck, same behavior.
I will try to run with the trunk tomorrow to see if it's stop correctly,
and I will keep you inform. There must be something with my configuration,
because I googled and saw nothing about this problem.

On Tue, Oct 11, 2016 at 9:28 PM, Eno Thereska 
wrote:

> Hi Pierre,
>
> I tried the exact code on MacOs and am not getting any errors. Could you
> check if all the directories in /tmp where Kafka Streams writes the RocksDb
> files are empty? I'm wondering if there is some bad state left over.
>
> Finally looks like you are running 0.10.0, could you try running trunk to
> see if the problem still exists? I'm running trunk. I know there were a
> couple of RocksDb fixes after 0.10.0
>
> Thanks
> Eno
>
> > On 11 Oct 2016, at 16:41, Pierre Coquentin 
> wrote:
> >
> > Hi,
> >
> > I have a simple test where I create a topology builder with one topic,
> one
> > processor using a persistent store, then I create a kafka streams, start
> > it, wait a bit, then close. Each time, the jvm crashes (seg fault) when
> > flushing the data. Anyone has already met this kind of problem ?
> >
> > OS:
> > Ubuntu 16.04
> >
> > JVM:
> > openjdk version "1.8.0_91"
> > OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.
> 04.1-b14)
> > OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> >
> > Kafka:
> > kafka_2.11-0.10.0.1
> >
> > The java code to reproduce the problem:
> >
> > public static void main(String[] args) throws InterruptedException {
> >Map configs = new HashMap<>();
> >configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > "localhost:2181");
> >configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> >configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> >configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > StringSerde.class.getName());
> >configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > StringSerde.class.getName());
> >
> >TopologyBuilder builder = new TopologyBuilder()
> >.addSource("source", "test")
> >.addProcessor("processor", () -> new Processor > String>() {
> >
> >private KeyValueStore kvStore;
> >
> >public void init(ProcessorContext context) {
> >kvStore = (KeyValueStore)
> > context.getStateStore("store");
> >}
> >
> >public void process(String key, String value) {
> >
> >}
> >
> >public void close() {
> >kvStore.close();
> >}
> >
> >@Override
> >public void punctuate(long timestamp) {
> >
> >
> >}
> >}, "source")
> >  .addStateStore(Stores.create("store").withKeys(new
> > StringSerde()).withValues(new StringSerde()).persistent().build(),
> > "processor");
> >
> >
> >KafkaStreams streams = new KafkaStreams(builder, new
> > StreamsConfig(configs));
> >streams.start();
> >TimeUnit.SECONDS.sleep(20);
> >streams.close();
> >}
> >
> >
> > There is the log:
> >
> >
> > 11.10.2016 17:27:11 [main] INFO
> > [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
> > replication.factor = 1
> > num.standby.replicas = 0
> > metric.reporters = []
> > commit.interval.ms = 3
> > bootstrap.servers = [localhost:9092]
> > state.dir = /tmp/kafka-streams
> > partition.grouper = class
> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> > state.cleanup.delay.ms = 6
> > poll.ms = 100
> > zookeeper.connect = localhost:2181
> > key.serde = class org.apache.kafka.common.serialization.Serdes$
> StringSerde
> > metrics.sample.window.ms = 3
> > buffered.records.per.partition = 1000
> > value.serde = class org.apache.kafka.common.serialization.Serdes$
> StringSerde
> > timestamp.extractor = class
> > org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
> > num.stream.threads = 1
> > metrics.num.samples = 2
> > application.id = test
> > client.id =
> >
> > 11.10.2016 17:27:11 [main] INFO
> > [org.apache.kafka.streams.processor.internals.StreamThread:170] Creating
> > producer client for stream thread [StreamThread-1]
> > 11.10.2016 17:27:11 [main] INFO
> > [org.apache.kafka.clients.producer.ProducerConfig:178] ProducerConfig
> > values:
> > metric.reporters = []
> > metadata.max.age.ms = 30
> > reconnect.backoff.ms = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > bootstrap.servers = [localhost:9092]
> > ssl.keystore.type = JKS
> > sasl.mechanism = GSSAPI
> > max.block.ms = 6
> > interceptor.classes = null
> > ssl.truststore.password = null
> > client.id = test-1-StreamThread-1-producer
> > ssl.endpoint.identification.algorithm = null
> > request.timeout.ms = 3
> > acks = 1
> > receive.buffer.

Re: JVM crash when closing persistent store (rocksDB)

2016-10-11 Thread Eno Thereska
Hi Pierre,

I tried the exact code on MacOs and am not getting any errors. Could you check 
if all the directories in /tmp where Kafka Streams writes the RocksDb files are 
empty? I'm wondering if there is some bad state left over. 

Finally looks like you are running 0.10.0, could you try running trunk to see 
if the problem still exists? I'm running trunk. I know there were a couple of 
RocksDb fixes after 0.10.0

Thanks
Eno

> On 11 Oct 2016, at 16:41, Pierre Coquentin  wrote:
> 
> Hi,
> 
> I have a simple test where I create a topology builder with one topic, one
> processor using a persistent store, then I create a kafka streams, start
> it, wait a bit, then close. Each time, the jvm crashes (seg fault) when
> flushing the data. Anyone has already met this kind of problem ?
> 
> OS:
> Ubuntu 16.04
> 
> JVM:
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.04.1-b14)
> OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> 
> Kafka:
> kafka_2.11-0.10.0.1
> 
> The java code to reproduce the problem:
> 
> public static void main(String[] args) throws InterruptedException {
>Map configs = new HashMap<>();
>configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181");
>configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> StringSerde.class.getName());
>configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> StringSerde.class.getName());
> 
>TopologyBuilder builder = new TopologyBuilder()
>.addSource("source", "test")
>.addProcessor("processor", () -> new Processor String>() {
> 
>private KeyValueStore kvStore;
> 
>public void init(ProcessorContext context) {
>kvStore = (KeyValueStore)
> context.getStateStore("store");
>}
> 
>public void process(String key, String value) {
> 
>}
> 
>public void close() {
>kvStore.close();
>}
> 
>@Override
>public void punctuate(long timestamp) {
> 
> 
>}
>}, "source")
>  .addStateStore(Stores.create("store").withKeys(new
> StringSerde()).withValues(new StringSerde()).persistent().build(),
> "processor");
> 
> 
>KafkaStreams streams = new KafkaStreams(builder, new
> StreamsConfig(configs));
>streams.start();
>TimeUnit.SECONDS.sleep(20);
>streams.close();
>}
> 
> 
> There is the log:
> 
> 
> 11.10.2016 17:27:11 [main] INFO
> [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
> replication.factor = 1
> num.standby.replicas = 0
> metric.reporters = []
> commit.interval.ms = 3
> bootstrap.servers = [localhost:9092]
> state.dir = /tmp/kafka-streams
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> state.cleanup.delay.ms = 6
> poll.ms = 100
> zookeeper.connect = localhost:2181
> key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
> metrics.sample.window.ms = 3
> buffered.records.per.partition = 1000
> value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
> timestamp.extractor = class
> org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
> num.stream.threads = 1
> metrics.num.samples = 2
> application.id = test
> client.id =
> 
> 11.10.2016 17:27:11 [main] INFO
> [org.apache.kafka.streams.processor.internals.StreamThread:170] Creating
> producer client for stream thread [StreamThread-1]
> 11.10.2016 17:27:11 [main] INFO
> [org.apache.kafka.clients.producer.ProducerConfig:178] ProducerConfig
> values:
> metric.reporters = []
> metadata.max.age.ms = 30
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [localhost:9092]
> ssl.keystore.type = JKS
> sasl.mechanism = GSSAPI
> max.block.ms = 6
> interceptor.classes = null
> ssl.truststore.password = null
> client.id = test-1-StreamThread-1-producer
> ssl.endpoint.identification.algorithm = null
> request.timeout.ms = 3
> acks = 1
> receive.buffer.bytes = 32768
> ssl.truststore.type = JKS
> retries = 0
> ssl.truststore.location = null
> ssl.keystore.password = null
> send.buffer.bytes = 131072
> compression.type = none
> metadata.fetch.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 3
> key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> sasl.kerberos.min.time.before.relogin = 6
> connec