Hi,

in case of a rebalance, partitions are reassigned and thus (shards) of a
store might move from one instance/thread to another. This could
potentially happen anytime, and you need to rediscover the shard/store
afterwards. Thus, your code must catch this exception and you can retry
the query after store rediscovery.


-Matthias

On 9/15/17 3:44 AM, Jari Väimölä wrote:
> Hello,
> 
> Here is the code snipplet.
> 
> 145
> 146    public ReadOnlyKeyValueStore<String,QuotasMsg> getQuotaStore() {
> 147        return this.runner.streams.store("quota-table",
> QueryableStoreTypes.keyValueStore());
> 148    }
> 149
> 
> Thanks,
> Jari
> 
> 
> On 15.09.2017 12:30, Ted Yu wrote:
>> bq.  at com.mytest.csd.kafka.KafkaDeduplicator.getQuotaStore(KafkaDe
>> duplicator.java:147)
>>
>> Can you show us relevant code snippet for the above method ?
>>
>> On Fri, Sep 15, 2017 at 2:20 AM, Jari Väimölä <jari.vain...@lekane.com>
>> wrote:
>>
>>> Hello all,
>>> I have an apache kafka stream application running in docker
>>> container. It
>>> writes to three output topics.
>>> In my environment there are two brokers and one zookeeper, all
>>> running on
>>> different hosts.
>>> After starting my stream application following error logs are printed:
>>>
>>> INFO  [2017-09-15 07:20:55,389] org.eclipse.jetty.server.Server: Started
>>> @3845ms
>>> INFO  [2017-09-15 07:21:00,003] com.mytest.csd.jobs.InstancesJob:
>>> Starting InstancesJob, writing to my.instances.dedup
>>> INFO  [2017-09-15 07:21:00,003] com.mytest.csd.kafka.KafkaDeduplicator:
>>> Current state: RUNNING
>>> ERROR [2017-09-15 07:21:00,007] com.mytest.csd.jobs.InstancesJob: Error
>>> running InstancesJob
>>> ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state
>>> store, instance-details-table, may have migrated to another instance.
>>> ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi
>>> der.getStore(QueryableStoreProvider.java:60)
>>> ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
>>> ! at com.mytest.csd.kafka.KafkaDeduplicator.getInstanceDetailsSto
>>> re(KafkaDeduplicator.java:143)
>>> ! at com.mytest.csd.jobs.InstancesJob.doJob(InstancesJob.java:66)
>>> ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33)
>>> ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
>>> ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(
>>> SimpleThreadPool.java:573)
>>> INFO  [2017-09-15 07:21:00,012] com.mytest.csd.jobs.CloudDetailsJob:
>>> Starting CloudDetailsJob, writing to my.clouds.dedup
>>> INFO  [2017-09-15 07:21:00,012] com.mytest.csd.kafka.KafkaDeduplicator:
>>> Current state: RUNNING
>>> ERROR [2017-09-15 07:21:00,012] com.mytest.csd.jobs.CloudDetailsJob:
>>> Error running CloudDetailsJob
>>> ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state
>>> store, cloud-details-table, may have migrated to another instance.
>>> ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi
>>> der.getStore(QueryableStoreProvider.java:60)
>>> ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
>>> ! at com.mytest.csd.kafka.KafkaDeduplicator.getCloudDetailsStore(
>>> KafkaDeduplicator.java:151)
>>> ! at com.mytest.csd.jobs.CloudDetailsJob.doJob(CloudDetailsJob.java:56)
>>> ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33)
>>> ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
>>> ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(
>>> SimpleThreadPool.java:573)
>>> INFO  [2017-09-15 07:21:00,013] com.mytest.csd.jobs.QuotaJob: Starting
>>> QuotaJob, writing to my.quota
>>> INFO  [2017-09-15 07:21:00,013] com.mytest.csd.kafka.KafkaDeduplicator:
>>> Current state: RUNNING
>>> ERROR [2017-09-15 07:21:00,013] com.mytest.csd.jobs.QuotaJob: Error
>>> running QuotaJob
>>> ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state
>>> store, quota-table, may have migrated to another instance.
>>> ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi
>>> der.getStore(QueryableStoreProvider.java:60)
>>> ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
>>> ! at com.mytest.csd.kafka.KafkaDeduplicator.getQuotaStore(KafkaDe
>>> duplicator.java:147)
>>> ! at com.mytest.csd.jobs.QuotaJob.doJob(QuotaJob.java:56)
>>> ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33)
>>> ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
>>> ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(
>>> SimpleThreadPool.java:573)
>>>
>>> Then periodically 1 minute interval same three error logs are printed,
>>> never ending until I stop my stream application.
>>> Same happens after starting again my stream application.
>>>
>>> What might cause this issue?
>>>
>>> Thanks and Best Regards,
>>> Jari
>>>
>>>
>>> Here is the kafka configuration of my stream application:
>>>
>>> INFO  [2017-09-15 07:20:54,099]
>>> org.apache.kafka.clients.producer.ProducerConfig:
>>> ProducerConfig values:
>>>      acks = all
>>>      batch.size = 16384
>>>      bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092]
>>>      buffer.memory = 33554432
>>>      client.id =
>>>      compression.type = none
>>>      connections.max.idle.ms = 540000
>>>      enable.idempotence = false
>>>      interceptor.classes = null
>>>      key.serializer = class org.apache.kafka.common.serial
>>> ization.StringSerializer
>>>      linger.ms = 0
>>>      max.block.ms = 60000
>>>      max.in.flight.requests.per.connection = 5
>>>      max.request.size = 1048576
>>>      metadata.max.age.ms = 300000
>>>      metric.reporters = []
>>>      metrics.num.samples = 2
>>>      metrics.recording.level = INFO
>>>      metrics.sample.window.ms = 30000
>>>      partitioner.class = class org.apache.kafka.clients.produ
>>> cer.internals.DefaultPartitioner
>>>      receive.buffer.bytes = 32768
>>>      reconnect.backoff.max.ms = 1000
>>>      reconnect.backoff.ms = 50
>>>      request.timeout.ms = 30000
>>>      retries = 0
>>>      retry.backoff.ms = 100
>>>      sasl.jaas.config = null
>>>      sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>      sasl.kerberos.min.time.before.relogin = 60000
>>>      sasl.kerberos.service.name = null
>>>      sasl.kerberos.ticket.renew.jitter = 0.05
>>>      sasl.kerberos.ticket.renew.window.factor = 0.8
>>>      sasl.mechanism = GSSAPI
>>>      security.protocol = PLAINTEXT
>>>      send.buffer.bytes = 131072
>>>      ssl.cipher.suites = null
>>>      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>      ssl.endpoint.identification.algorithm = null
>>>      ssl.key.password = null
>>>      ssl.keymanager.algorithm = SunX509
>>>      ssl.keystore.location = null
>>>      ssl.keystore.password = null
>>>      ssl.keystore.type = JKS
>>>      ssl.protocol = TLS
>>>      ssl.provider = null
>>>      ssl.secure.random.implementation = null
>>>      ssl.trustmanager.algorithm = PKIX
>>>      ssl.truststore.location = null
>>>      ssl.truststore.password = null
>>>      ssl.truststore.type = JKS
>>>      transaction.timeout.ms = 60000
>>>      transactional.id = null
>>>      value.serializer = class org.apache.kafka.common.serial
>>> ization.StringSerializer
>>>
>>> INFO  [2017-09-15 07:20:54,156]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka version : 0.11.0.0
>>> INFO  [2017-09-15 07:20:54,156]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka commitId : cb8625948210849f
>>> INFO  [2017-09-15 07:20:54,157] com.mytest.csd.kafka.KafkaDeduplicator:
>>> Initializing StreamRunner...
>>> INFO  [2017-09-15 07:20:54,169] org.apache.kafka.streams.StreamsConfig:
>>> StreamsConfig values:
>>>      application.id = cloud-stream-deduplicator
>>>      application.server =
>>>      bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092]
>>>      buffered.records.per.partition = 1000
>>>      cache.max.bytes.buffering = 10485760
>>>      client.id = cloud-stream-deduplicator
>>>      commit.interval.ms = 10000
>>>      connections.max.idle.ms = 540000
>>>      default.key.serde = class org.apache.kafka.common.serial
>>> ization.Serdes$ByteArraySerde
>>>      default.timestamp.extractor = class com.mytest.csd.kafka.messages.
>>> extractors.CloudStreamTimestampExtractor
>>>      default.value.serde = class org.apache.kafka.common.serial
>>> ization.Serdes$ByteArraySerde
>>>      key.serde = null
>>>      metadata.max.age.ms = 300000
>>>      metric.reporters = []
>>>      metrics.num.samples = 2
>>>      metrics.recording.level = INFO
>>>      metrics.sample.window.ms = 30000
>>>      num.standby.replicas = 1
>>>      num.stream.threads = 3
>>>      partition.grouper = class org.apache.kafka.streams.proce
>>> ssor.DefaultPartitionGrouper
>>>      poll.ms = 100
>>>      processing.guarantee = at_least_once
>>>      receive.buffer.bytes = 32768
>>>      reconnect.backoff.max.ms = 1000
>>>      reconnect.backoff.ms = 50
>>>      replication.factor = 1
>>>      request.timeout.ms = 300000
>>>      retry.backoff.ms = 100
>>>      rocksdb.config.setter = class com.mytest.csd.kafka.RocksDBConfig
>>>      security.protocol = PLAINTEXT
>>>      send.buffer.bytes = 131072
>>>      state.cleanup.delay.ms = 600000
>>>      state.dir = /root/kafka_state
>>>      timestamp.extractor = null
>>>      value.serde = null
>>>      windowstore.changelog.additional.retention.ms = 86400000
>>>      zookeeper.connect =
>>>
>>> INFO  [2017-09-15 07:20:54,243]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-1] Creating
>>> consumer client
>>> INFO  [2017-09-15 07:20:54,259]
>>> org.apache.kafka.clients.consumer.ConsumerConfig:
>>> ConsumerConfig values:
>>>      auto.commit.interval.ms = 5000
>>>      auto.offset.reset = latest
>>>      bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092]
>>>      check.crcs = true
>>>      client.id = cloud-stream-deduplicator-StreamThread-1-consumer
>>>      connections.max.idle.ms = 540000
>>>      enable.auto.commit = false
>>>      exclude.internal.topics = true
>>>      fetch.max.bytes = 52428800
>>>      fetch.max.wait.ms = 500
>>>      fetch.min.bytes = 1
>>>      group.id = cloud-stream-deduplicator
>>>      heartbeat.interval.ms = 3000
>>>      interceptor.classes = null
>>>      internal.leave.group.on.close = false
>>>      isolation.level = read_uncommitted
>>>      key.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>      max.partition.fetch.bytes = 1048576
>>>      max.poll.interval.ms = 2147483647
>>>      max.poll.records = 1000
>>>      metadata.max.age.ms = 300000
>>>      metric.reporters = []
>>>      metrics.num.samples = 2
>>>      metrics.recording.level = INFO
>>>      metrics.sample.window.ms = 30000
>>>      partition.assignment.strategy = [org.apache.kafka.streams.proc
>>> essor.internals.StreamPartitionAssignor]
>>>      receive.buffer.bytes = 65536
>>>      reconnect.backoff.max.ms = 1000
>>>      reconnect.backoff.ms = 50
>>>      request.timeout.ms = 300000
>>>      retry.backoff.ms = 100
>>>      sasl.jaas.config = null
>>>      sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>      sasl.kerberos.min.time.before.relogin = 60000
>>>      sasl.kerberos.service.name = null
>>>      sasl.kerberos.ticket.renew.jitter = 0.05
>>>      sasl.kerberos.ticket.renew.window.factor = 0.8
>>>      sasl.mechanism = GSSAPI
>>>      security.protocol = PLAINTEXT
>>>      send.buffer.bytes = 131072
>>>      session.timeout.ms = 180000
>>>      ssl.cipher.suites = null
>>>      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>      ssl.endpoint.identification.algorithm = null
>>>      ssl.key.password = null
>>>      ssl.keymanager.algorithm = SunX509
>>>      ssl.keystore.location = null
>>>      ssl.keystore.password = null
>>>      ssl.keystore.type = JKS
>>>      ssl.protocol = TLS
>>>      ssl.provider = null
>>>      ssl.secure.random.implementation = null
>>>      ssl.trustmanager.algorithm = PKIX
>>>      ssl.truststore.location = null
>>>      ssl.truststore.password = null
>>>      ssl.truststore.type = JKS
>>>      value.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>
>>> INFO  [2017-09-15 07:20:54,352]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka version : 0.11.0.0
>>> INFO  [2017-09-15 07:20:54,352]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka commitId : cb8625948210849f
>>> INFO  [2017-09-15 07:20:54,352]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-1] Creating
>>> restore
>>> consumer client
>>> INFO  [2017-09-15 07:20:54,353]
>>> org.apache.kafka.clients.consumer.ConsumerConfig:
>>> ConsumerConfig values:
>>>      auto.commit.interval.ms = 5000
>>>      auto.offset.reset = latest
>>>      bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092]
>>>      check.crcs = true
>>>      client.id =
>>> cloud-stream-deduplicator-StreamThread-1-restore-consumer
>>>      connections.max.idle.ms = 540000
>>>      enable.auto.commit = false
>>>      exclude.internal.topics = true
>>>      fetch.max.bytes = 52428800
>>>      fetch.max.wait.ms = 500
>>>      fetch.min.bytes = 1
>>>      group.id =
>>>      heartbeat.interval.ms = 3000
>>>      interceptor.classes = null
>>>      internal.leave.group.on.close = false
>>>      isolation.level = read_uncommitted
>>>      key.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>      max.partition.fetch.bytes = 1048576
>>>      max.poll.interval.ms = 2147483647
>>>      max.poll.records = 1000
>>>      metadata.max.age.ms = 300000
>>>      metric.reporters = []
>>>      metrics.num.samples = 2
>>>      metrics.recording.level = INFO
>>>      metrics.sample.window.ms = 30000
>>>      partition.assignment.strategy = [class
>>> org.apache.kafka.clients.consu
>>> mer.RangeAssignor]
>>>      receive.buffer.bytes = 65536
>>>      reconnect.backoff.max.ms = 1000
>>>      reconnect.backoff.ms = 50
>>>      request.timeout.ms = 300000
>>>      retry.backoff.ms = 100
>>>      sasl.jaas.config = null
>>>      sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>      sasl.kerberos.min.time.before.relogin = 60000
>>>      sasl.kerberos.service.name = null
>>>      sasl.kerberos.ticket.renew.jitter = 0.05
>>>      sasl.kerberos.ticket.renew.window.factor = 0.8
>>>      sasl.mechanism = GSSAPI
>>>      security.protocol = PLAINTEXT
>>>      send.buffer.bytes = 131072
>>>      session.timeout.ms = 180000
>>>      ssl.cipher.suites = null
>>>      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>      ssl.endpoint.identification.algorithm = null
>>>      ssl.key.password = null
>>>      ssl.keymanager.algorithm = SunX509
>>>      ssl.keystore.location = null
>>>      ssl.keystore.password = null
>>>      ssl.keystore.type = JKS
>>>      ssl.protocol = TLS
>>>      ssl.provider = null
>>>      ssl.secure.random.implementation = null
>>>      ssl.trustmanager.algorithm = PKIX
>>>      ssl.truststore.location = null
>>>      ssl.truststore.password = null
>>>      ssl.truststore.type = JKS
>>>      value.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>
>>> INFO  [2017-09-15 07:20:54,369]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka version : 0.11.0.0
>>> INFO  [2017-09-15 07:20:54,370]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka commitId : cb8625948210849f
>>> INFO  [2017-09-15 07:20:54,372]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-1] State
>>> transition
>>> from CREATED to RUNNING.
>>> INFO  [2017-09-15 07:20:54,373]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-2] Creating
>>> consumer client
>>> INFO  [2017-09-15 07:20:54,375]
>>> org.apache.kafka.clients.consumer.ConsumerConfig:
>>> ConsumerConfig values:
>>>      auto.commit.interval.ms = 5000
>>>      auto.offset.reset = latest
>>>      bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092]
>>>      check.crcs = true
>>>      client.id = cloud-stream-deduplicator-StreamThread-2-consumer
>>>      connections.max.idle.ms = 540000
>>>      enable.auto.commit = false
>>>      exclude.internal.topics = true
>>>      fetch.max.bytes = 52428800
>>>      fetch.max.wait.ms = 500
>>>      fetch.min.bytes = 1
>>>      group.id = cloud-stream-deduplicator
>>>      heartbeat.interval.ms = 3000
>>>      interceptor.classes = null
>>>      internal.leave.group.on.close = false
>>>      isolation.level = read_uncommitted
>>>      key.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>      max.partition.fetch.bytes = 1048576
>>>      max.poll.interval.ms = 2147483647
>>>      max.poll.records = 1000
>>>      metadata.max.age.ms = 300000
>>>      metric.reporters = []
>>>      metrics.num.samples = 2
>>>      metrics.recording.level = INFO
>>>      metrics.sample.window.ms = 30000
>>>      partition.assignment.strategy = [org.apache.kafka.streams.proc
>>> essor.internals.StreamPartitionAssignor]
>>>      receive.buffer.bytes = 65536
>>>      reconnect.backoff.max.ms = 1000
>>>      reconnect.backoff.ms = 50
>>>      request.timeout.ms = 300000
>>>      retry.backoff.ms = 100
>>>      sasl.jaas.config = null
>>>      sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>      sasl.kerberos.min.time.before.relogin = 60000
>>>      sasl.kerberos.service.name = null
>>>      sasl.kerberos.ticket.renew.jitter = 0.05
>>>      sasl.kerberos.ticket.renew.window.factor = 0.8
>>>      sasl.mechanism = GSSAPI
>>>      security.protocol = PLAINTEXT
>>>      send.buffer.bytes = 131072
>>>      session.timeout.ms = 180000
>>>      ssl.cipher.suites = null
>>>      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>      ssl.endpoint.identification.algorithm = null
>>>      ssl.key.password = null
>>>      ssl.keymanager.algorithm = SunX509
>>>      ssl.keystore.location = null
>>>      ssl.keystore.password = null
>>>      ssl.keystore.type = JKS
>>>      ssl.protocol = TLS
>>>      ssl.provider = null
>>>      ssl.secure.random.implementation = null
>>>      ssl.trustmanager.algorithm = PKIX
>>>      ssl.truststore.location = null
>>>      ssl.truststore.password = null
>>>      ssl.truststore.type = JKS
>>>      value.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>
>>> INFO  [2017-09-15 07:20:54,392]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka version : 0.11.0.0
>>> INFO  [2017-09-15 07:20:54,395]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka commitId : cb8625948210849f
>>> INFO  [2017-09-15 07:20:54,396]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-2] Creating
>>> restore
>>> consumer client
>>> INFO  [2017-09-15 07:20:54,396]
>>> org.apache.kafka.clients.consumer.ConsumerConfig:
>>> ConsumerConfig values:
>>>      auto.commit.interval.ms = 5000
>>>      auto.offset.reset = latest
>>>      bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092]
>>>      check.crcs = true
>>>      client.id =
>>> cloud-stream-deduplicator-StreamThread-2-restore-consumer
>>>      connections.max.idle.ms = 540000
>>>      enable.auto.commit = false
>>>      exclude.internal.topics = true
>>>      fetch.max.bytes = 52428800
>>>      fetch.max.wait.ms = 500
>>>      fetch.min.bytes = 1
>>>      group.id =
>>>      heartbeat.interval.ms = 3000
>>>      interceptor.classes = null
>>>      internal.leave.group.on.close = false
>>>      isolation.level = read_uncommitted
>>>      key.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>      max.partition.fetch.bytes = 1048576
>>>      max.poll.interval.ms = 2147483647
>>>      max.poll.records = 1000
>>>      metadata.max.age.ms = 300000
>>>      metric.reporters = []
>>>      metrics.num.samples = 2
>>>      metrics.recording.level = INFO
>>>      metrics.sample.window.ms = 30000
>>>      partition.assignment.strategy = [class
>>> org.apache.kafka.clients.consu
>>> mer.RangeAssignor]
>>>      receive.buffer.bytes = 65536
>>>      reconnect.backoff.max.ms = 1000
>>>      reconnect.backoff.ms = 50
>>>      request.timeout.ms = 300000
>>>      retry.backoff.ms = 100
>>>      sasl.jaas.config = null
>>>      sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>      sasl.kerberos.min.time.before.relogin = 60000
>>>      sasl.kerberos.service.name = null
>>>      sasl.kerberos.ticket.renew.jitter = 0.05
>>>      sasl.kerberos.ticket.renew.window.factor = 0.8
>>>      sasl.mechanism = GSSAPI
>>>      security.protocol = PLAINTEXT
>>>      send.buffer.bytes = 131072
>>>      session.timeout.ms = 180000
>>>      ssl.cipher.suites = null
>>>      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>      ssl.endpoint.identification.algorithm = null
>>>      ssl.key.password = null
>>>      ssl.keymanager.algorithm = SunX509
>>>      ssl.keystore.location = null
>>>      ssl.keystore.password = null
>>>      ssl.keystore.type = JKS
>>>      ssl.protocol = TLS
>>>      ssl.provider = null
>>>      ssl.secure.random.implementation = null
>>>      ssl.trustmanager.algorithm = PKIX
>>>      ssl.truststore.location = null
>>>      ssl.truststore.password = null
>>>      ssl.truststore.type = JKS
>>>      value.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>
>>> INFO  [2017-09-15 07:20:54,407]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka version : 0.11.0.0
>>> INFO  [2017-09-15 07:20:54,407]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka commitId : cb8625948210849f
>>> INFO  [2017-09-15 07:20:54,408]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-2] State
>>> transition
>>> from CREATED to RUNNING.
>>> INFO  [2017-09-15 07:20:54,414]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-3] Creating
>>> consumer client
>>> INFO  [2017-09-15 07:20:54,417]
>>> org.apache.kafka.clients.consumer.ConsumerConfig:
>>> ConsumerConfig values:
>>>      auto.commit.interval.ms = 5000
>>>      auto.offset.reset = latest
>>>      bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092]
>>>      check.crcs = true
>>>      client.id = cloud-stream-deduplicator-StreamThread-3-consumer
>>>      connections.max.idle.ms = 540000
>>>      enable.auto.commit = false
>>>      exclude.internal.topics = true
>>>      fetch.max.bytes = 52428800
>>>      fetch.max.wait.ms = 500
>>>      fetch.min.bytes = 1
>>>      group.id = cloud-stream-deduplicator
>>>      heartbeat.interval.ms = 3000
>>>      interceptor.classes = null
>>>      internal.leave.group.on.close = false
>>>      isolation.level = read_uncommitted
>>>      key.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>      max.partition.fetch.bytes = 1048576
>>>      max.poll.interval.ms = 2147483647
>>>      max.poll.records = 1000
>>>      metadata.max.age.ms = 300000
>>>      metric.reporters = []
>>>      metrics.num.samples = 2
>>>      metrics.recording.level = INFO
>>>      metrics.sample.window.ms = 30000
>>>      partition.assignment.strategy = [org.apache.kafka.streams.proc
>>> essor.internals.StreamPartitionAssignor]
>>>      receive.buffer.bytes = 65536
>>>      reconnect.backoff.max.ms = 1000
>>>      reconnect.backoff.ms = 50
>>>      request.timeout.ms = 300000
>>>      retry.backoff.ms = 100
>>>      sasl.jaas.config = null
>>>      sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>      sasl.kerberos.min.time.before.relogin = 60000
>>>      sasl.kerberos.service.name = null
>>>      sasl.kerberos.ticket.renew.jitter = 0.05
>>>      sasl.kerberos.ticket.renew.window.factor = 0.8
>>>      sasl.mechanism = GSSAPI
>>>      security.protocol = PLAINTEXT
>>>      send.buffer.bytes = 131072
>>>      session.timeout.ms = 180000
>>>      ssl.cipher.suites = null
>>>      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>      ssl.endpoint.identification.algorithm = null
>>>      ssl.key.password = null
>>>      ssl.keymanager.algorithm = SunX509
>>>      ssl.keystore.location = null
>>>      ssl.keystore.password = null
>>>      ssl.keystore.type = JKS
>>>      ssl.protocol = TLS
>>>      ssl.provider = null
>>>      ssl.secure.random.implementation = null
>>>      ssl.trustmanager.algorithm = PKIX
>>>      ssl.truststore.location = null
>>>      ssl.truststore.password = null
>>>      ssl.truststore.type = JKS
>>>      value.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>
>>> INFO  [2017-09-15 07:20:54,447]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka version : 0.11.0.0
>>> INFO  [2017-09-15 07:20:54,451]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka commitId : cb8625948210849f
>>> INFO  [2017-09-15 07:20:54,452]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-3] Creating
>>> restore
>>> consumer client
>>> INFO  [2017-09-15 07:20:54,452]
>>> org.apache.kafka.clients.consumer.ConsumerConfig:
>>> ConsumerConfig values:
>>>      auto.commit.interval.ms = 5000
>>>      auto.offset.reset = latest
>>>      bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092]
>>>      check.crcs = true
>>>      client.id =
>>> cloud-stream-deduplicator-StreamThread-3-restore-consumer
>>>      connections.max.idle.ms = 540000
>>>      enable.auto.commit = false
>>>      exclude.internal.topics = true
>>>      fetch.max.bytes = 52428800
>>>      fetch.max.wait.ms = 500
>>>      fetch.min.bytes = 1
>>>      group.id =
>>>      heartbeat.interval.ms = 3000
>>>      interceptor.classes = null
>>>      internal.leave.group.on.close = false
>>>      isolation.level = read_uncommitted
>>>      key.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>      max.partition.fetch.bytes = 1048576
>>>      max.poll.interval.ms = 2147483647
>>>      max.poll.records = 1000
>>>      metadata.max.age.ms = 300000
>>>      metric.reporters = []
>>>      metrics.num.samples = 2
>>>      metrics.recording.level = INFO
>>>      metrics.sample.window.ms = 30000
>>>      partition.assignment.strategy = [class
>>> org.apache.kafka.clients.consu
>>> mer.RangeAssignor]
>>>      receive.buffer.bytes = 65536
>>>      reconnect.backoff.max.ms = 1000
>>>      reconnect.backoff.ms = 50
>>>      request.timeout.ms = 300000
>>>      retry.backoff.ms = 100
>>>      sasl.jaas.config = null
>>>      sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>      sasl.kerberos.min.time.before.relogin = 60000
>>>      sasl.kerberos.service.name = null
>>>      sasl.kerberos.ticket.renew.jitter = 0.05
>>>      sasl.kerberos.ticket.renew.window.factor = 0.8
>>>      sasl.mechanism = GSSAPI
>>>      security.protocol = PLAINTEXT
>>>      send.buffer.bytes = 131072
>>>      session.timeout.ms = 180000
>>>      ssl.cipher.suites = null
>>>      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>      ssl.endpoint.identification.algorithm = null
>>>      ssl.key.password = null
>>>      ssl.keymanager.algorithm = SunX509
>>>      ssl.keystore.location = null
>>>      ssl.keystore.password = null
>>>      ssl.keystore.type = JKS
>>>      ssl.protocol = TLS
>>>      ssl.provider = null
>>>      ssl.secure.random.implementation = null
>>>      ssl.trustmanager.algorithm = PKIX
>>>      ssl.truststore.location = null
>>>      ssl.truststore.password = null
>>>      ssl.truststore.type = JKS
>>>      value.deserializer = class org.apache.kafka.common.serial
>>> ization.ByteArrayDeserializer
>>>
>>> INFO  [2017-09-15 07:20:54,455]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka version : 0.11.0.0
>>> INFO  [2017-09-15 07:20:54,459]
>>> org.apache.kafka.common.utils.AppInfoParser:
>>> Kafka commitId : cb8625948210849f
>>> INFO  [2017-09-15 07:20:54,460]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-3] State
>>> transition
>>> from CREATED to RUNNING.
>>> INFO  [2017-09-15 07:20:54,672] org.apache.kafka.streams.KafkaStreams:
>>> stream-client [cloud-stream-deduplicator] State transition from
>>> CREATED to
>>> RUNNING.
>>> INFO  [2017-09-15 07:20:54,679]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-1] Starting
>>> INFO  [2017-09-15 07:20:54,690]
>>> org.apache.kafka.streams.processor.internals.StreamThread:
>>> stream-thread [cloud-stream-deduplicator-StreamThread-2] Starting
>>> INFO  [2017-09-15 07:20:54,691] org.apache.kafka.streams.KafkaStreams:
>>> stream-client [cloud-stream-deduplicator] Started Kafka Stream process
>>> INFO  [2017-09-15 07:20:54,691] com.mytest.csd.kafka.KafkaDeduplicator:
>>> Stream topology:
>>> KafkaStreams processID: 1ec2c01e-9738-4ce4-b540-8bd0820d2ea6
>>>      StreamsThread appId: cloud-stream-deduplicator
>>>          StreamsThread clientId: cloud-stream-deduplicator
>>>          StreamsThread threadId:
>>> cloud-stream-deduplicator-StreamThread-1
>>>          Active tasks:
>>>          Standby tasks:
>>>
>>>      StreamsThread appId: cloud-stream-deduplicator
>>>          StreamsThread clientId: cloud-stream-deduplicator
>>>          StreamsThread threadId:
>>> cloud-stream-deduplicator-StreamThread-2
>>>          Active tasks:
>>>          Standby tasks:
>>>
>>>      StreamsThread appId: cloud-stream-deduplicator
>>>          StreamsThread clientId: cloud-stream-deduplicator
>>>          StreamsThread threadId:
>>> cloud-stream-deduplicator-StreamThread-3
>>>          Active tasks:
>>>          Standby tasks:
>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to