Hi, Do you have the application.server property set appropriately for both hosts?
The second stack trace is this bug: https://issues.apache.org/jira/browse/KAFKA-5556 On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi - > > In my Kafka Streams application, I have a state store resulting from a > stateful streaming topology. The environment is > > - Kafka 0.10.2.1 > - It runs on a DC/OS cluster > - I am running Confluent-Kafka 3.2.2 on the cluster > - Each topic that I have has 2 partitions with replication factor = 2 > - The application also has an associated http service that does > interactive queries on the state store > > The application runs fine when I invoke a single instance. I can use the > http endpoints to do queries and everything looks good. Problems surface > when I try to spawn another instance of the application. I use the same > APPLICATION_ID_CONFIG and the instance starts on a different node of the > cluster. The data consumption part works fine as the new instance also > starts consuming from the same topic as the first one. But when I try the > http query, the metadata fetch fails .. > > I have some code snippet like this as part of the query that tries to fetch > the metadata so that I can locate the host to query on .. > > metadataService.streamsMetadataForStoreAndKey(store, hostKey, > stringSerializer) match { > case Success(host) => { > // hostKey is on another instance. call the other instance to fetch > the data. > if (!thisHost(host)) { > logger.warn(s"Key $hostKey is on another instance not on $host - > requerying ..") > httpRequester.queryFromHost[Long](host, path) > } else { > // hostKey is on this instance > localStateStoreQuery.queryStateStore(streams, store, hostKey) > } > } > case Failure(ex) => Future.failed(ex) > } > > and the metadataService.streamsMetadataForStoreAndKey has the following > call .. > > streams.metadataForKey(store, key, serializer) match { > case null => throw new IllegalArgumentException(s"Metadata for key > $key not found in $store") > case metadata => new HostStoreInfo(metadata.host, metadata.port, > metadata.stateStoreNames.asScala.toSet) > } > > When I start the second instance, streams.metadataForKey returns null for > any key I pass .. Here's the relevant stack trace .. > > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not > found in access-count-per-host > at > > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$streamsMetadataForStoreAndKey$1(MetadataService.scala:51) > at scala.util.Try$.apply(Try.scala:209) > at > > com.xx.fdp.sample.kstream.services.MetadataService.streamsMetadataForStoreAndKey(MetadataService.scala:46) > at > > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryInfo(KeyValueFetcher.scala:36) > at > > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCountSummary(KeyValueFetcher.scala:29) > at > > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun$routes$10(WeblogDSLHttpService.scala:41) > at > > akka.http.scaladsl.server.directives.RouteDirectives.$anonfun$complete$1(RouteDirectives.scala:47) > at > > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19) > at > > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19) > ... > > and following this exception I get another one which looks like an internal > exception that stops the application .. > > 09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-1] Streams application error during processing: > java.lang.IllegalStateException: Attempt to retrieve exception from future > which hasn't failed > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > at > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > > BTW the application runs fine when I have 2 instances running on the same > host (my laptop) or on 1 instance on the cluster. The problem surfaces only > when I start the second instance. > > Any help / pointer / area to look into ? > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >