Do you have any logs that might help to work out what is going wrong? On Fri, 28 Jul 2017 at 14:16 Damian Guy <damian....@gmail.com> wrote:
> The config looks ok to me > > On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r >> referring to. Just now I noticed that I may also need to set >> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1). >> Anything else that I may be missing ? >> >> >> regards. >> >> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com >> > >> wrote: >> >> > Hi Damien - >> > >> > I am not sure I understand what u mean .. I have the following set in >> the >> > application .. Do I need to set anything else at the host level ? >> > Environment variable ? >> > >> > val streamingConfig = { >> > val settings = new Properties >> > settings.put(StreamsConfig.APPLICATION_ID_CONFIG, >> > "kstream-weblog-processing") >> > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >> config.brokers) >> > >> > config.schemaRegistryUrl.foreach{ url => >> > >> settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >> > url) >> > } >> > >> > settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > Serdes.ByteArray.getClass.getName) >> > settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> > Serdes.String.getClass.getName) >> > >> > // setting offset reset to earliest so that we can re-run the demo >> > code with the same pre-loaded data >> > // Note: To re-run the demo, you need to use the offset reset >> tool: >> > // https://cwiki.apache.org/confluence/display/KAFKA/ >> > Kafka+Streams+Application+Reset+Tool >> > settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") >> > >> > // need this for query service >> > settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, >> > s"${config.httpInterface}:${config.httpPort}") >> > >> > // default is /tmp/kafka-streams >> > settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir) >> > >> > // Set the commit interval to 500ms so that any changes are >> flushed >> > frequently and the summary >> > // data are updated with low latency. >> > settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500") >> > >> > settings >> > } >> > >> > Please explain a bit .. >> > >> > regards. >> > >> > >> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com> >> wrote: >> > >> >> Hi, >> >> >> >> Do you have the application.server property set appropriately for both >> >> hosts? >> >> >> >> The second stack trace is this bug: >> >> https://issues.apache.org/jira/browse/KAFKA-5556 >> >> >> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com> >> >> wrote: >> >> >> >> > Hi - >> >> > >> >> > In my Kafka Streams application, I have a state store resulting from >> a >> >> > stateful streaming topology. The environment is >> >> > >> >> > - Kafka 0.10.2.1 >> >> > - It runs on a DC/OS cluster >> >> > - I am running Confluent-Kafka 3.2.2 on the cluster >> >> > - Each topic that I have has 2 partitions with replication factor >> = 2 >> >> > - The application also has an associated http service that does >> >> > interactive queries on the state store >> >> > >> >> > The application runs fine when I invoke a single instance. I can use >> the >> >> > http endpoints to do queries and everything looks good. Problems >> surface >> >> > when I try to spawn another instance of the application. I use the >> same >> >> > APPLICATION_ID_CONFIG and the instance starts on a different node of >> the >> >> > cluster. The data consumption part works fine as the new instance >> also >> >> > starts consuming from the same topic as the first one. But when I try >> >> the >> >> > http query, the metadata fetch fails .. >> >> > >> >> > I have some code snippet like this as part of the query that tries to >> >> fetch >> >> > the metadata so that I can locate the host to query on .. >> >> > >> >> > metadataService.streamsMetadataForStoreAndKey(store, hostKey, >> >> > stringSerializer) match { >> >> > case Success(host) => { >> >> > // hostKey is on another instance. call the other instance to >> >> fetch >> >> > the data. >> >> > if (!thisHost(host)) { >> >> > logger.warn(s"Key $hostKey is on another instance not on >> >> $host - >> >> > requerying ..") >> >> > httpRequester.queryFromHost[Long](host, path) >> >> > } else { >> >> > // hostKey is on this instance >> >> > localStateStoreQuery.queryStateStore(streams, store, >> hostKey) >> >> > } >> >> > } >> >> > case Failure(ex) => Future.failed(ex) >> >> > } >> >> > >> >> > and the metadataService.streamsMetadataForStoreAndKey has the >> following >> >> > call .. >> >> > >> >> > streams.metadataForKey(store, key, serializer) match { >> >> > case null => throw new IllegalArgumentException(s"Metadata for >> >> key >> >> > $key not found in $store") >> >> > case metadata => new HostStoreInfo(metadata.host, >> metadata.port, >> >> > metadata.stateStoreNames.asScala.toSet) >> >> > } >> >> > >> >> > When I start the second instance, streams.metadataForKey returns null >> >> for >> >> > any key I pass .. Here's the relevant stack trace .. >> >> > >> >> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net >> not >> >> > found in access-count-per-host >> >> > at >> >> > >> >> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$ >> >> streamsMetadataForStoreAndKey$1(MetadataService.scala:51) >> >> > at scala.util.Try$.apply(Try.scala:209) >> >> > at >> >> > >> >> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe >> >> tadataForStoreAndKey(MetadataService.scala:46) >> >> > at >> >> > >> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI >> >> nfo(KeyValueFetcher.scala:36) >> >> > at >> >> > >> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo >> >> untSummary(KeyValueFetcher.scala:29) >> >> > at >> >> > >> >> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun >> >> $routes$10(WeblogDSLHttpService.scala:41) >> >> > at >> >> > >> >> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu >> >> n$complete$1(RouteDirectives.scala:47) >> >> > at >> >> > >> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa >> >> rdRoute.scala:19) >> >> > at >> >> > >> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa >> >> rdRoute.scala:19) >> >> > ... >> >> > >> >> > and following this exception I get another one which looks like an >> >> internal >> >> > exception that stops the application .. >> >> > >> >> > 09:57:33.731 TKD [StreamThread-1] ERROR >> o.a.k.s.p.internals.StreamThread >> >> - >> >> > stream-thread [StreamThread-1] Streams application error during >> >> processing: >> >> > java.lang.IllegalStateException: Attempt to retrieve exception from >> >> future >> >> > which hasn't failed >> >> > at >> >> > >> >> > org.apache.kafka.clients.consumer.internals.RequestFuture. >> >> exception(RequestFuture.java:99) >> >> > at >> >> > >> >> > org.apache.kafka.clients.consumer.internals.RequestFuture. >> >> isRetriable(RequestFuture.java:89) >> >> > at >> >> > >> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590) >> >> > at >> >> > >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( >> >> KafkaConsumer.java:1124) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamTask. >> >> commitOffsets(StreamTask.java:296) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamTask$1. >> >> run(StreamTask.java:79) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI >> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamTask. >> >> commit(StreamTask.java:280) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. >> >> commitOne(StreamThread.java:807) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. >> >> commitAll(StreamThread.java:794) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. >> >> maybeCommit(StreamThread.java:769) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. >> >> runLoop(StreamThread.java:647) >> >> > at >> >> > >> >> > org.apache.kafka.streams.processor.internals.StreamThread. >> >> run(StreamThread.java:361) >> >> > >> >> > BTW the application runs fine when I have 2 instances running on the >> >> same >> >> > host (my laptop) or on 1 instance on the cluster. The problem >> surfaces >> >> only >> >> > when I start the second instance. >> >> > >> >> > Any help / pointer / area to look into ? >> >> > >> >> > -- >> >> > Debasish Ghosh >> >> > http://manning.com/ghosh2 >> >> > http://manning.com/ghosh >> >> > >> >> > Twttr: @debasishg >> >> > Blog: http://debasishg.blogspot.com >> >> > Code: http://github.com/debasishg >> >> > >> >> >> > >> > >> > >> > -- >> > Debasish Ghosh >> > http://manning.com/ghosh2 >> > http://manning.com/ghosh >> > >> > Twttr: @debasishg >> > Blog: http://debasishg.blogspot.com >> > Code: http://github.com/debasishg >> > >> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> >