Hi, in case of a rebalance, partitions are reassigned and thus (shards) of a store might move from one instance/thread to another. This could potentially happen anytime, and you need to rediscover the shard/store afterwards. Thus, your code must catch this exception and you can retry the query after store rediscovery.
-Matthias On 9/15/17 3:44 AM, Jari Väimölä wrote: > Hello, > > Here is the code snipplet. > > 145 > 146 public ReadOnlyKeyValueStore<String,QuotasMsg> getQuotaStore() { > 147 return this.runner.streams.store("quota-table", > QueryableStoreTypes.keyValueStore()); > 148 } > 149 > > Thanks, > Jari > > > On 15.09.2017 12:30, Ted Yu wrote: >> bq. at com.mytest.csd.kafka.KafkaDeduplicator.getQuotaStore(KafkaDe >> duplicator.java:147) >> >> Can you show us relevant code snippet for the above method ? >> >> On Fri, Sep 15, 2017 at 2:20 AM, Jari Väimölä <jari.vain...@lekane.com> >> wrote: >> >>> Hello all, >>> I have an apache kafka stream application running in docker >>> container. It >>> writes to three output topics. >>> In my environment there are two brokers and one zookeeper, all >>> running on >>> different hosts. >>> After starting my stream application following error logs are printed: >>> >>> INFO [2017-09-15 07:20:55,389] org.eclipse.jetty.server.Server: Started >>> @3845ms >>> INFO [2017-09-15 07:21:00,003] com.mytest.csd.jobs.InstancesJob: >>> Starting InstancesJob, writing to my.instances.dedup >>> INFO [2017-09-15 07:21:00,003] com.mytest.csd.kafka.KafkaDeduplicator: >>> Current state: RUNNING >>> ERROR [2017-09-15 07:21:00,007] com.mytest.csd.jobs.InstancesJob: Error >>> running InstancesJob >>> ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state >>> store, instance-details-table, may have migrated to another instance. >>> ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi >>> der.getStore(QueryableStoreProvider.java:60) >>> ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728) >>> ! at com.mytest.csd.kafka.KafkaDeduplicator.getInstanceDetailsSto >>> re(KafkaDeduplicator.java:143) >>> ! at com.mytest.csd.jobs.InstancesJob.doJob(InstancesJob.java:66) >>> ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33) >>> ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202) >>> ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run( >>> SimpleThreadPool.java:573) >>> INFO [2017-09-15 07:21:00,012] com.mytest.csd.jobs.CloudDetailsJob: >>> Starting CloudDetailsJob, writing to my.clouds.dedup >>> INFO [2017-09-15 07:21:00,012] com.mytest.csd.kafka.KafkaDeduplicator: >>> Current state: RUNNING >>> ERROR [2017-09-15 07:21:00,012] com.mytest.csd.jobs.CloudDetailsJob: >>> Error running CloudDetailsJob >>> ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state >>> store, cloud-details-table, may have migrated to another instance. >>> ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi >>> der.getStore(QueryableStoreProvider.java:60) >>> ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728) >>> ! at com.mytest.csd.kafka.KafkaDeduplicator.getCloudDetailsStore( >>> KafkaDeduplicator.java:151) >>> ! at com.mytest.csd.jobs.CloudDetailsJob.doJob(CloudDetailsJob.java:56) >>> ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33) >>> ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202) >>> ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run( >>> SimpleThreadPool.java:573) >>> INFO [2017-09-15 07:21:00,013] com.mytest.csd.jobs.QuotaJob: Starting >>> QuotaJob, writing to my.quota >>> INFO [2017-09-15 07:21:00,013] com.mytest.csd.kafka.KafkaDeduplicator: >>> Current state: RUNNING >>> ERROR [2017-09-15 07:21:00,013] com.mytest.csd.jobs.QuotaJob: Error >>> running QuotaJob >>> ! org.apache.kafka.streams.errors.InvalidStateStoreException: the state >>> store, quota-table, may have migrated to another instance. >>> ! at org.apache.kafka.streams.state.internals.QueryableStoreProvi >>> der.getStore(QueryableStoreProvider.java:60) >>> ! at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728) >>> ! at com.mytest.csd.kafka.KafkaDeduplicator.getQuotaStore(KafkaDe >>> duplicator.java:147) >>> ! at com.mytest.csd.jobs.QuotaJob.doJob(QuotaJob.java:56) >>> ! at de.spinscale.dropwizard.jobs.Job.execute(Job.java:33) >>> ! at org.quartz.core.JobRunShell.run(JobRunShell.java:202) >>> ! at org.quartz.simpl.SimpleThreadPool$WorkerThread.run( >>> SimpleThreadPool.java:573) >>> >>> Then periodically 1 minute interval same three error logs are printed, >>> never ending until I stop my stream application. >>> Same happens after starting again my stream application. >>> >>> What might cause this issue? >>> >>> Thanks and Best Regards, >>> Jari >>> >>> >>> Here is the kafka configuration of my stream application: >>> >>> INFO [2017-09-15 07:20:54,099] >>> org.apache.kafka.clients.producer.ProducerConfig: >>> ProducerConfig values: >>> acks = all >>> batch.size = 16384 >>> bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] >>> buffer.memory = 33554432 >>> client.id = >>> compression.type = none >>> connections.max.idle.ms = 540000 >>> enable.idempotence = false >>> interceptor.classes = null >>> key.serializer = class org.apache.kafka.common.serial >>> ization.StringSerializer >>> linger.ms = 0 >>> max.block.ms = 60000 >>> max.in.flight.requests.per.connection = 5 >>> max.request.size = 1048576 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partitioner.class = class org.apache.kafka.clients.produ >>> cer.internals.DefaultPartitioner >>> receive.buffer.bytes = 32768 >>> reconnect.backoff.max.ms = 1000 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 30000 >>> retries = 0 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> transaction.timeout.ms = 60000 >>> transactional.id = null >>> value.serializer = class org.apache.kafka.common.serial >>> ization.StringSerializer >>> >>> INFO [2017-09-15 07:20:54,156] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka version : 0.11.0.0 >>> INFO [2017-09-15 07:20:54,156] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka commitId : cb8625948210849f >>> INFO [2017-09-15 07:20:54,157] com.mytest.csd.kafka.KafkaDeduplicator: >>> Initializing StreamRunner... >>> INFO [2017-09-15 07:20:54,169] org.apache.kafka.streams.StreamsConfig: >>> StreamsConfig values: >>> application.id = cloud-stream-deduplicator >>> application.server = >>> bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] >>> buffered.records.per.partition = 1000 >>> cache.max.bytes.buffering = 10485760 >>> client.id = cloud-stream-deduplicator >>> commit.interval.ms = 10000 >>> connections.max.idle.ms = 540000 >>> default.key.serde = class org.apache.kafka.common.serial >>> ization.Serdes$ByteArraySerde >>> default.timestamp.extractor = class com.mytest.csd.kafka.messages. >>> extractors.CloudStreamTimestampExtractor >>> default.value.serde = class org.apache.kafka.common.serial >>> ization.Serdes$ByteArraySerde >>> key.serde = null >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> num.standby.replicas = 1 >>> num.stream.threads = 3 >>> partition.grouper = class org.apache.kafka.streams.proce >>> ssor.DefaultPartitionGrouper >>> poll.ms = 100 >>> processing.guarantee = at_least_once >>> receive.buffer.bytes = 32768 >>> reconnect.backoff.max.ms = 1000 >>> reconnect.backoff.ms = 50 >>> replication.factor = 1 >>> request.timeout.ms = 300000 >>> retry.backoff.ms = 100 >>> rocksdb.config.setter = class com.mytest.csd.kafka.RocksDBConfig >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> state.cleanup.delay.ms = 600000 >>> state.dir = /root/kafka_state >>> timestamp.extractor = null >>> value.serde = null >>> windowstore.changelog.additional.retention.ms = 86400000 >>> zookeeper.connect = >>> >>> INFO [2017-09-15 07:20:54,243] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-1] Creating >>> consumer client >>> INFO [2017-09-15 07:20:54,259] >>> org.apache.kafka.clients.consumer.ConsumerConfig: >>> ConsumerConfig values: >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] >>> check.crcs = true >>> client.id = cloud-stream-deduplicator-StreamThread-1-consumer >>> connections.max.idle.ms = 540000 >>> enable.auto.commit = false >>> exclude.internal.topics = true >>> fetch.max.bytes = 52428800 >>> fetch.max.wait.ms = 500 >>> fetch.min.bytes = 1 >>> group.id = cloud-stream-deduplicator >>> heartbeat.interval.ms = 3000 >>> interceptor.classes = null >>> internal.leave.group.on.close = false >>> isolation.level = read_uncommitted >>> key.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> max.partition.fetch.bytes = 1048576 >>> max.poll.interval.ms = 2147483647 >>> max.poll.records = 1000 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partition.assignment.strategy = [org.apache.kafka.streams.proc >>> essor.internals.StreamPartitionAssignor] >>> receive.buffer.bytes = 65536 >>> reconnect.backoff.max.ms = 1000 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 300000 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> session.timeout.ms = 180000 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> value.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> >>> INFO [2017-09-15 07:20:54,352] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka version : 0.11.0.0 >>> INFO [2017-09-15 07:20:54,352] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka commitId : cb8625948210849f >>> INFO [2017-09-15 07:20:54,352] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-1] Creating >>> restore >>> consumer client >>> INFO [2017-09-15 07:20:54,353] >>> org.apache.kafka.clients.consumer.ConsumerConfig: >>> ConsumerConfig values: >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] >>> check.crcs = true >>> client.id = >>> cloud-stream-deduplicator-StreamThread-1-restore-consumer >>> connections.max.idle.ms = 540000 >>> enable.auto.commit = false >>> exclude.internal.topics = true >>> fetch.max.bytes = 52428800 >>> fetch.max.wait.ms = 500 >>> fetch.min.bytes = 1 >>> group.id = >>> heartbeat.interval.ms = 3000 >>> interceptor.classes = null >>> internal.leave.group.on.close = false >>> isolation.level = read_uncommitted >>> key.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> max.partition.fetch.bytes = 1048576 >>> max.poll.interval.ms = 2147483647 >>> max.poll.records = 1000 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partition.assignment.strategy = [class >>> org.apache.kafka.clients.consu >>> mer.RangeAssignor] >>> receive.buffer.bytes = 65536 >>> reconnect.backoff.max.ms = 1000 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 300000 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> session.timeout.ms = 180000 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> value.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> >>> INFO [2017-09-15 07:20:54,369] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka version : 0.11.0.0 >>> INFO [2017-09-15 07:20:54,370] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka commitId : cb8625948210849f >>> INFO [2017-09-15 07:20:54,372] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-1] State >>> transition >>> from CREATED to RUNNING. >>> INFO [2017-09-15 07:20:54,373] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-2] Creating >>> consumer client >>> INFO [2017-09-15 07:20:54,375] >>> org.apache.kafka.clients.consumer.ConsumerConfig: >>> ConsumerConfig values: >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] >>> check.crcs = true >>> client.id = cloud-stream-deduplicator-StreamThread-2-consumer >>> connections.max.idle.ms = 540000 >>> enable.auto.commit = false >>> exclude.internal.topics = true >>> fetch.max.bytes = 52428800 >>> fetch.max.wait.ms = 500 >>> fetch.min.bytes = 1 >>> group.id = cloud-stream-deduplicator >>> heartbeat.interval.ms = 3000 >>> interceptor.classes = null >>> internal.leave.group.on.close = false >>> isolation.level = read_uncommitted >>> key.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> max.partition.fetch.bytes = 1048576 >>> max.poll.interval.ms = 2147483647 >>> max.poll.records = 1000 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partition.assignment.strategy = [org.apache.kafka.streams.proc >>> essor.internals.StreamPartitionAssignor] >>> receive.buffer.bytes = 65536 >>> reconnect.backoff.max.ms = 1000 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 300000 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> session.timeout.ms = 180000 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> value.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> >>> INFO [2017-09-15 07:20:54,392] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka version : 0.11.0.0 >>> INFO [2017-09-15 07:20:54,395] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka commitId : cb8625948210849f >>> INFO [2017-09-15 07:20:54,396] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-2] Creating >>> restore >>> consumer client >>> INFO [2017-09-15 07:20:54,396] >>> org.apache.kafka.clients.consumer.ConsumerConfig: >>> ConsumerConfig values: >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] >>> check.crcs = true >>> client.id = >>> cloud-stream-deduplicator-StreamThread-2-restore-consumer >>> connections.max.idle.ms = 540000 >>> enable.auto.commit = false >>> exclude.internal.topics = true >>> fetch.max.bytes = 52428800 >>> fetch.max.wait.ms = 500 >>> fetch.min.bytes = 1 >>> group.id = >>> heartbeat.interval.ms = 3000 >>> interceptor.classes = null >>> internal.leave.group.on.close = false >>> isolation.level = read_uncommitted >>> key.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> max.partition.fetch.bytes = 1048576 >>> max.poll.interval.ms = 2147483647 >>> max.poll.records = 1000 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partition.assignment.strategy = [class >>> org.apache.kafka.clients.consu >>> mer.RangeAssignor] >>> receive.buffer.bytes = 65536 >>> reconnect.backoff.max.ms = 1000 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 300000 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> session.timeout.ms = 180000 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> value.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> >>> INFO [2017-09-15 07:20:54,407] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka version : 0.11.0.0 >>> INFO [2017-09-15 07:20:54,407] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka commitId : cb8625948210849f >>> INFO [2017-09-15 07:20:54,408] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-2] State >>> transition >>> from CREATED to RUNNING. >>> INFO [2017-09-15 07:20:54,414] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-3] Creating >>> consumer client >>> INFO [2017-09-15 07:20:54,417] >>> org.apache.kafka.clients.consumer.ConsumerConfig: >>> ConsumerConfig values: >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] >>> check.crcs = true >>> client.id = cloud-stream-deduplicator-StreamThread-3-consumer >>> connections.max.idle.ms = 540000 >>> enable.auto.commit = false >>> exclude.internal.topics = true >>> fetch.max.bytes = 52428800 >>> fetch.max.wait.ms = 500 >>> fetch.min.bytes = 1 >>> group.id = cloud-stream-deduplicator >>> heartbeat.interval.ms = 3000 >>> interceptor.classes = null >>> internal.leave.group.on.close = false >>> isolation.level = read_uncommitted >>> key.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> max.partition.fetch.bytes = 1048576 >>> max.poll.interval.ms = 2147483647 >>> max.poll.records = 1000 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partition.assignment.strategy = [org.apache.kafka.streams.proc >>> essor.internals.StreamPartitionAssignor] >>> receive.buffer.bytes = 65536 >>> reconnect.backoff.max.ms = 1000 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 300000 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> session.timeout.ms = 180000 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> value.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> >>> INFO [2017-09-15 07:20:54,447] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka version : 0.11.0.0 >>> INFO [2017-09-15 07:20:54,451] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka commitId : cb8625948210849f >>> INFO [2017-09-15 07:20:54,452] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-3] Creating >>> restore >>> consumer client >>> INFO [2017-09-15 07:20:54,452] >>> org.apache.kafka.clients.consumer.ConsumerConfig: >>> ConsumerConfig values: >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [10.131.43.70:9092, 10.131.43.75:9092] >>> check.crcs = true >>> client.id = >>> cloud-stream-deduplicator-StreamThread-3-restore-consumer >>> connections.max.idle.ms = 540000 >>> enable.auto.commit = false >>> exclude.internal.topics = true >>> fetch.max.bytes = 52428800 >>> fetch.max.wait.ms = 500 >>> fetch.min.bytes = 1 >>> group.id = >>> heartbeat.interval.ms = 3000 >>> interceptor.classes = null >>> internal.leave.group.on.close = false >>> isolation.level = read_uncommitted >>> key.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> max.partition.fetch.bytes = 1048576 >>> max.poll.interval.ms = 2147483647 >>> max.poll.records = 1000 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partition.assignment.strategy = [class >>> org.apache.kafka.clients.consu >>> mer.RangeAssignor] >>> receive.buffer.bytes = 65536 >>> reconnect.backoff.max.ms = 1000 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 300000 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = PLAINTEXT >>> send.buffer.bytes = 131072 >>> session.timeout.ms = 180000 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> value.deserializer = class org.apache.kafka.common.serial >>> ization.ByteArrayDeserializer >>> >>> INFO [2017-09-15 07:20:54,455] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka version : 0.11.0.0 >>> INFO [2017-09-15 07:20:54,459] >>> org.apache.kafka.common.utils.AppInfoParser: >>> Kafka commitId : cb8625948210849f >>> INFO [2017-09-15 07:20:54,460] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-3] State >>> transition >>> from CREATED to RUNNING. >>> INFO [2017-09-15 07:20:54,672] org.apache.kafka.streams.KafkaStreams: >>> stream-client [cloud-stream-deduplicator] State transition from >>> CREATED to >>> RUNNING. >>> INFO [2017-09-15 07:20:54,679] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-1] Starting >>> INFO [2017-09-15 07:20:54,690] >>> org.apache.kafka.streams.processor.internals.StreamThread: >>> stream-thread [cloud-stream-deduplicator-StreamThread-2] Starting >>> INFO [2017-09-15 07:20:54,691] org.apache.kafka.streams.KafkaStreams: >>> stream-client [cloud-stream-deduplicator] Started Kafka Stream process >>> INFO [2017-09-15 07:20:54,691] com.mytest.csd.kafka.KafkaDeduplicator: >>> Stream topology: >>> KafkaStreams processID: 1ec2c01e-9738-4ce4-b540-8bd0820d2ea6 >>> StreamsThread appId: cloud-stream-deduplicator >>> StreamsThread clientId: cloud-stream-deduplicator >>> StreamsThread threadId: >>> cloud-stream-deduplicator-StreamThread-1 >>> Active tasks: >>> Standby tasks: >>> >>> StreamsThread appId: cloud-stream-deduplicator >>> StreamsThread clientId: cloud-stream-deduplicator >>> StreamsThread threadId: >>> cloud-stream-deduplicator-StreamThread-2 >>> Active tasks: >>> Standby tasks: >>> >>> StreamsThread appId: cloud-stream-deduplicator >>> StreamsThread clientId: cloud-stream-deduplicator >>> StreamsThread threadId: >>> cloud-stream-deduplicator-StreamThread-3 >>> Active tasks: >>> Standby tasks: >>> >>> >
signature.asc
Description: OpenPGP digital signature