Hi Damien - I am not sure I understand what u mean .. I have the following set in the application .. Do I need to set anything else at the host level ? Environment variable ?
val streamingConfig = { val settings = new Properties settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-weblog-processing") settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers) config.schemaRegistryUrl.foreach{ url => settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url) } settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName) settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // Note: To re-run the demo, you need to use the offset reset tool: // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // need this for query service settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s"${config.httpInterface}:${config.httpPort}") // default is /tmp/kafka-streams settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir) // Set the commit interval to 500ms so that any changes are flushed frequently and the summary // data are updated with low latency. settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500") settings } Please explain a bit .. regards. On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com> wrote: > Hi, > > Do you have the application.server property set appropriately for both > hosts? > > The second stack trace is this bug: > https://issues.apache.org/jira/browse/KAFKA-5556 > > On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > > > Hi - > > > > In my Kafka Streams application, I have a state store resulting from a > > stateful streaming topology. The environment is > > > > - Kafka 0.10.2.1 > > - It runs on a DC/OS cluster > > - I am running Confluent-Kafka 3.2.2 on the cluster > > - Each topic that I have has 2 partitions with replication factor = 2 > > - The application also has an associated http service that does > > interactive queries on the state store > > > > The application runs fine when I invoke a single instance. I can use the > > http endpoints to do queries and everything looks good. Problems surface > > when I try to spawn another instance of the application. I use the same > > APPLICATION_ID_CONFIG and the instance starts on a different node of the > > cluster. The data consumption part works fine as the new instance also > > starts consuming from the same topic as the first one. But when I try the > > http query, the metadata fetch fails .. > > > > I have some code snippet like this as part of the query that tries to > fetch > > the metadata so that I can locate the host to query on .. > > > > metadataService.streamsMetadataForStoreAndKey(store, hostKey, > > stringSerializer) match { > > case Success(host) => { > > // hostKey is on another instance. call the other instance to > fetch > > the data. > > if (!thisHost(host)) { > > logger.warn(s"Key $hostKey is on another instance not on $host > - > > requerying ..") > > httpRequester.queryFromHost[Long](host, path) > > } else { > > // hostKey is on this instance > > localStateStoreQuery.queryStateStore(streams, store, hostKey) > > } > > } > > case Failure(ex) => Future.failed(ex) > > } > > > > and the metadataService.streamsMetadataForStoreAndKey has the following > > call .. > > > > streams.metadataForKey(store, key, serializer) match { > > case null => throw new IllegalArgumentException(s"Metadata for key > > $key not found in $store") > > case metadata => new HostStoreInfo(metadata.host, metadata.port, > > metadata.stateStoreNames.asScala.toSet) > > } > > > > When I start the second instance, streams.metadataForKey returns null for > > any key I pass .. Here's the relevant stack trace .. > > > > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not > > found in access-count-per-host > > at > > > > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$ > streamsMetadataForStoreAndKey$1(MetadataService.scala:51) > > at scala.util.Try$.apply(Try.scala:209) > > at > > > > com.xx.fdp.sample.kstream.services.MetadataService. > streamsMetadataForStoreAndKey(MetadataService.scala:46) > > at > > > > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryInfo( > KeyValueFetcher.scala:36) > > at > > > > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCountSummary( > KeyValueFetcher.scala:29) > > at > > > > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun$routes$10( > WeblogDSLHttpService.scala:41) > > at > > > > akka.http.scaladsl.server.directives.RouteDirectives.$ > anonfun$complete$1(RouteDirectives.scala:47) > > at > > > > akka.http.scaladsl.server.StandardRoute$$anon$1.apply( > StandardRoute.scala:19) > > at > > > > akka.http.scaladsl.server.StandardRoute$$anon$1.apply( > StandardRoute.scala:19) > > ... > > > > and following this exception I get another one which looks like an > internal > > exception that stops the application .. > > > > 09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread > - > > stream-thread [StreamThread-1] Streams application error during > processing: > > java.lang.IllegalStateException: Attempt to retrieve exception from > future > > which hasn't failed > > at > > > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception( > RequestFuture.java:99) > > at > > > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable( > RequestFuture.java:89) > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > commitOffsetsSync(ConsumerCoordinator.java:590) > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > commitSync(KafkaConsumer.java:1124) > > at > > > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets( > StreamTask.java:296) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:79) > > at > > > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:280) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > StreamThread.java:807) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > StreamThread.java:794) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > StreamThread.java:769) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:647) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:361) > > > > BTW the application runs fine when I have 2 instances running on the same > > host (my laptop) or on 1 instance on the cluster. The problem surfaces > only > > when I start the second instance. > > > > Any help / pointer / area to look into ? > > > > -- > > Debasish Ghosh > > http://manning.com/ghosh2 > > http://manning.com/ghosh > > > > Twttr: @debasishg > > Blog: http://debasishg.blogspot.com > > Code: http://github.com/debasishg > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg