bq. at com.mytest.csd.kafka.KafkaDeduplicator.getQuotaStore(KafkaDe duplicator.java:147)
Can you show us relevant code snippet for the above method ? On Fri, Sep 15, 2017 at 2:20 AM, Jari Väimölä <jari.vain...@lekane.com> wrote: > Hello all, > I have an apache kafka stream application running in docker container. It > writes to three output topics. > In my environment there are two brokers and one zookeeper, all running on > different hosts. > After starting my stream application following error logs are printed: > > INFO [2017-09-15 07:20:55,389] org.eclipse.jetty.server.Server: Started > @3845ms > INFO [2017-09-15 07:21:00,003] com.mytest.csd.jobs.InstancesJob: > Starting InstancesJob, writing to my.instances.dedup > INFO [2017-09-15 07:21:00,003] com.mytest.csd.kafka.KafkaDeduplicator: > Current state: RUNNING > ERROR [2017-09-15 07:21:00,007] com.mytest.csd.jobs.InstancesJob: Error > running InstancesJob > ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state > store, instance-details-table, may have migrated to another instance. > ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi > der.getStore(QueryableStoreProvider.java:60) > ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728) > ! at com.mytest.csd.kafka.KafkaDeduplicator.getInstanceDetailsSto > re(KafkaDeduplicator.java:143) > ! at com.mytest.csd.jobs.InstancesJob.doJob(InstancesJob.java:66) > ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33) > ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202) > ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run( > SimpleThreadPool.java:573) > INFO [2017-09-15 07:21:00,012] com.mytest.csd.jobs.CloudDetailsJob: > Starting CloudDetailsJob, writing to my.clouds.dedup > INFO [2017-09-15 07:21:00,012] com.mytest.csd.kafka.KafkaDeduplicator: > Current state: RUNNING > ERROR [2017-09-15 07:21:00,012] com.mytest.csd.jobs.CloudDetailsJob: > Error running CloudDetailsJob > ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state > store, cloud-details-table, may have migrated to another instance. > ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi > der.getStore(QueryableStoreProvider.java:60) > ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728) > ! at com.mytest.csd.kafka.KafkaDeduplicator.getCloudDetailsStore( > KafkaDeduplicator.java:151) > ! at com.mytest.csd.jobs.CloudDetailsJob.doJob(CloudDetailsJob.java:56) > ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33) > ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202) > ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run( > SimpleThreadPool.java:573) > INFO [2017-09-15 07:21:00,013] com.mytest.csd.jobs.QuotaJob: Starting > QuotaJob, writing to my.quota > INFO [2017-09-15 07:21:00,013] com.mytest.csd.kafka.KafkaDeduplicator: > Current state: RUNNING > ERROR [2017-09-15 07:21:00,013] com.mytest.csd.jobs.QuotaJob: Error > running QuotaJob > ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state > store, quota-table, may have migrated to another instance. > ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi > der.getStore(QueryableStoreProvider.java:60) > ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728) > ! at com.mytest.csd.kafka.KafkaDeduplicator.getQuotaStore(KafkaDe > duplicator.java:147) > ! at com.mytest.csd.jobs.QuotaJob.doJob(QuotaJob.java:56) > ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33) > ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202) > ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run( > SimpleThreadPool.java:573) > > Then periodically 1 minute interval same three error logs are printed, > never ending until I stop my stream application. > Same happens after starting again my stream application. > > What might cause this issue? > > Thanks and Best Regards, > Jari > > > Here is the kafka configuration of my stream application: > > INFO [2017-09-15 07:20:54,099] > org.apache.kafka.clients.producer.ProducerConfig: > ProducerConfig values: > acks = all > batch.size = 16384 > bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] > buffer.memory = 33554432 > client.id = > compression.type = none > connections.max.idle.ms = 540000 > enable.idempotence = false > interceptor.classes = null > key.serializer = class org.apache.kafka.common.serial > ization.StringSerializer > linger.ms = 0 > max.block.ms = 60000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class org.apache.kafka.clients.produ > cer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 0 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = null > value.serializer = class org.apache.kafka.common.serial > ization.StringSerializer > > INFO [2017-09-15 07:20:54,156] org.apache.kafka.common.utils.AppInfoParser: > Kafka version : 0.11.0.0 > INFO [2017-09-15 07:20:54,156] org.apache.kafka.common.utils.AppInfoParser: > Kafka commitId : cb8625948210849f > INFO [2017-09-15 07:20:54,157] com.mytest.csd.kafka.KafkaDeduplicator: > Initializing StreamRunner... > INFO [2017-09-15 07:20:54,169] org.apache.kafka.streams.StreamsConfig: > StreamsConfig values: > application.id = cloud-stream-deduplicator > application.server = > bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] > buffered.records.per.partition = 1000 > cache.max.bytes.buffering = 10485760 > client.id = cloud-stream-deduplicator > commit.interval.ms = 10000 > connections.max.idle.ms = 540000 > default.key.serde = class org.apache.kafka.common.serial > ization.Serdes$ByteArraySerde > default.timestamp.extractor = class com.mytest.csd.kafka.messages. > extractors.CloudStreamTimestampExtractor > default.value.serde = class org.apache.kafka.common.serial > ization.Serdes$ByteArraySerde > key.serde = null > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > num.standby.replicas = 1 > num.stream.threads = 3 > partition.grouper = class org.apache.kafka.streams.proce > ssor.DefaultPartitionGrouper > poll.ms = 100 > processing.guarantee = at_least_once > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > replication.factor = 1 > request.timeout.ms = 300000 > retry.backoff.ms = 100 > rocksdb.config.setter = class com.mytest.csd.kafka.RocksDBConfig > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > state.cleanup.delay.ms = 600000 > state.dir = /root/kafka_state > timestamp.extractor = null > value.serde = null > windowstore.changelog.additional.retention.ms = 86400000 > zookeeper.connect = > > INFO [2017-09-15 07:20:54,243] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-1] Creating > consumer client > INFO [2017-09-15 07:20:54,259] > org.apache.kafka.clients.consumer.ConsumerConfig: > ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] > check.crcs = true > client.id = cloud-stream-deduplicator-StreamThread-1-consumer > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = cloud-stream-deduplicator > heartbeat.interval.ms = 3000 > interceptor.classes = null > internal.leave.group.on.close = false > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [org.apache.kafka.streams.proc > essor.internals.StreamPartitionAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 300000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 180000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > > INFO [2017-09-15 07:20:54,352] org.apache.kafka.common.utils.AppInfoParser: > Kafka version : 0.11.0.0 > INFO [2017-09-15 07:20:54,352] org.apache.kafka.common.utils.AppInfoParser: > Kafka commitId : cb8625948210849f > INFO [2017-09-15 07:20:54,352] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-1] Creating restore > consumer client > INFO [2017-09-15 07:20:54,353] > org.apache.kafka.clients.consumer.ConsumerConfig: > ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] > check.crcs = true > client.id = cloud-stream-deduplicator-StreamThread-1-restore-consumer > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = > heartbeat.interval.ms = 3000 > interceptor.classes = null > internal.leave.group.on.close = false > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class org.apache.kafka.clients.consu > mer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 300000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 180000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > > INFO [2017-09-15 07:20:54,369] org.apache.kafka.common.utils.AppInfoParser: > Kafka version : 0.11.0.0 > INFO [2017-09-15 07:20:54,370] org.apache.kafka.common.utils.AppInfoParser: > Kafka commitId : cb8625948210849f > INFO [2017-09-15 07:20:54,372] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-1] State transition > from CREATED to RUNNING. > INFO [2017-09-15 07:20:54,373] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-2] Creating > consumer client > INFO [2017-09-15 07:20:54,375] > org.apache.kafka.clients.consumer.ConsumerConfig: > ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] > check.crcs = true > client.id = cloud-stream-deduplicator-StreamThread-2-consumer > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = cloud-stream-deduplicator > heartbeat.interval.ms = 3000 > interceptor.classes = null > internal.leave.group.on.close = false > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [org.apache.kafka.streams.proc > essor.internals.StreamPartitionAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 300000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 180000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > > INFO [2017-09-15 07:20:54,392] org.apache.kafka.common.utils.AppInfoParser: > Kafka version : 0.11.0.0 > INFO [2017-09-15 07:20:54,395] org.apache.kafka.common.utils.AppInfoParser: > Kafka commitId : cb8625948210849f > INFO [2017-09-15 07:20:54,396] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-2] Creating restore > consumer client > INFO [2017-09-15 07:20:54,396] > org.apache.kafka.clients.consumer.ConsumerConfig: > ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] > check.crcs = true > client.id = cloud-stream-deduplicator-StreamThread-2-restore-consumer > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = > heartbeat.interval.ms = 3000 > interceptor.classes = null > internal.leave.group.on.close = false > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class org.apache.kafka.clients.consu > mer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 300000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 180000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > > INFO [2017-09-15 07:20:54,407] org.apache.kafka.common.utils.AppInfoParser: > Kafka version : 0.11.0.0 > INFO [2017-09-15 07:20:54,407] org.apache.kafka.common.utils.AppInfoParser: > Kafka commitId : cb8625948210849f > INFO [2017-09-15 07:20:54,408] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-2] State transition > from CREATED to RUNNING. > INFO [2017-09-15 07:20:54,414] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-3] Creating > consumer client > INFO [2017-09-15 07:20:54,417] > org.apache.kafka.clients.consumer.ConsumerConfig: > ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] > check.crcs = true > client.id = cloud-stream-deduplicator-StreamThread-3-consumer > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = cloud-stream-deduplicator > heartbeat.interval.ms = 3000 > interceptor.classes = null > internal.leave.group.on.close = false > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [org.apache.kafka.streams.proc > essor.internals.StreamPartitionAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 300000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 180000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > > INFO [2017-09-15 07:20:54,447] org.apache.kafka.common.utils.AppInfoParser: > Kafka version : 0.11.0.0 > INFO [2017-09-15 07:20:54,451] org.apache.kafka.common.utils.AppInfoParser: > Kafka commitId : cb8625948210849f > INFO [2017-09-15 07:20:54,452] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-3] Creating restore > consumer client > INFO [2017-09-15 07:20:54,452] > org.apache.kafka.clients.consumer.ConsumerConfig: > ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] > check.crcs = true > client.id = cloud-stream-deduplicator-StreamThread-3-restore-consumer > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = > heartbeat.interval.ms = 3000 > interceptor.classes = null > internal.leave.group.on.close = false > isolation.level = read_uncommitted > key.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 2147483647 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class org.apache.kafka.clients.consu > mer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 300000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 180000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serial > ization.ByteArrayDeserializer > > INFO [2017-09-15 07:20:54,455] org.apache.kafka.common.utils.AppInfoParser: > Kafka version : 0.11.0.0 > INFO [2017-09-15 07:20:54,459] org.apache.kafka.common.utils.AppInfoParser: > Kafka commitId : cb8625948210849f > INFO [2017-09-15 07:20:54,460] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-3] State transition > from CREATED to RUNNING. > INFO [2017-09-15 07:20:54,672] org.apache.kafka.streams.KafkaStreams: > stream-client [cloud-stream-deduplicator] State transition from CREATED to > RUNNING. > INFO [2017-09-15 07:20:54,679] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-1] Starting > INFO [2017-09-15 07:20:54,690] > org.apache.kafka.streams.processor.internals.StreamThread: > stream-thread [cloud-stream-deduplicator-StreamThread-2] Starting > INFO [2017-09-15 07:20:54,691] org.apache.kafka.streams.KafkaStreams: > stream-client [cloud-stream-deduplicator] Started Kafka Stream process > INFO [2017-09-15 07:20:54,691] com.mytest.csd.kafka.KafkaDeduplicator: > Stream topology: > KafkaStreams processID: 1ec2c01e-9738-4ce4-b540-8bd0820d2ea6 > StreamsThread appId: cloud-stream-deduplicator > StreamsThread clientId: cloud-stream-deduplicator > StreamsThread threadId: cloud-stream-deduplicator-StreamThread-1 > Active tasks: > Standby tasks: > > StreamsThread appId: cloud-stream-deduplicator > StreamsThread clientId: cloud-stream-deduplicator > StreamsThread threadId: cloud-stream-deduplicator-StreamThread-2 > Active tasks: > Standby tasks: > > StreamsThread appId: cloud-stream-deduplicator > StreamsThread clientId: cloud-stream-deduplicator > StreamsThread threadId: cloud-stream-deduplicator-StreamThread-3 > Active tasks: > Standby tasks: > >