The log file is a huge one. I can send it to you though. Before that let me confirm one point ..
I set the APPLICATION_SERVER_CONFIG to s"${config.httpInterface}:${config.httpPort}". In my case the httpInterface is "0.0.0.0" and the port is set to 7070. Since the two instances start on different nodes, this should be ok - right ? regards. On Fri, Jul 28, 2017 at 8:18 PM, Damian Guy <damian....@gmail.com> wrote: > Do you have any logs that might help to work out what is going wrong? > > On Fri, 28 Jul 2017 at 14:16 Damian Guy <damian....@gmail.com> wrote: > >> The config looks ok to me >> >> On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r >>> referring to. Just now I noticed that I may also need to set >>> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1). >>> Anything else that I may be missing ? >>> >>> >>> regards. >>> >>> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh < >>> ghosh.debas...@gmail.com> >>> wrote: >>> >>> > Hi Damien - >>> > >>> > I am not sure I understand what u mean .. I have the following set in >>> the >>> > application .. Do I need to set anything else at the host level ? >>> > Environment variable ? >>> > >>> > val streamingConfig = { >>> > val settings = new Properties >>> > settings.put(StreamsConfig.APPLICATION_ID_CONFIG, >>> > "kstream-weblog-processing") >>> > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>> config.brokers) >>> > >>> > config.schemaRegistryUrl.foreach{ url => >>> > settings.put(AbstractKafkaAvroSerDeConfig. >>> SCHEMA_REGISTRY_URL_CONFIG, >>> > url) >>> > } >>> > >>> > settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> > Serdes.ByteArray.getClass.getName) >>> > settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> > Serdes.String.getClass.getName) >>> > >>> > // setting offset reset to earliest so that we can re-run the >>> demo >>> > code with the same pre-loaded data >>> > // Note: To re-run the demo, you need to use the offset reset >>> tool: >>> > // https://cwiki.apache.org/confluence/display/KAFKA/ >>> > Kafka+Streams+Application+Reset+Tool >>> > settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>> "earliest") >>> > >>> > // need this for query service >>> > settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, >>> > s"${config.httpInterface}:${config.httpPort}") >>> > >>> > // default is /tmp/kafka-streams >>> > settings.put(StreamsConfig.STATE_DIR_CONFIG, >>> config.stateStoreDir) >>> > >>> > // Set the commit interval to 500ms so that any changes are >>> flushed >>> > frequently and the summary >>> > // data are updated with low latency. >>> > settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500") >>> > >>> > settings >>> > } >>> > >>> > Please explain a bit .. >>> > >>> > regards. >>> > >>> > >>> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com> >>> wrote: >>> > >>> >> Hi, >>> >> >>> >> Do you have the application.server property set appropriately for both >>> >> hosts? >>> >> >>> >> The second stack trace is this bug: >>> >> https://issues.apache.org/jira/browse/KAFKA-5556 >>> >> >>> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com >>> > >>> >> wrote: >>> >> >>> >> > Hi - >>> >> > >>> >> > In my Kafka Streams application, I have a state store resulting >>> from a >>> >> > stateful streaming topology. The environment is >>> >> > >>> >> > - Kafka 0.10.2.1 >>> >> > - It runs on a DC/OS cluster >>> >> > - I am running Confluent-Kafka 3.2.2 on the cluster >>> >> > - Each topic that I have has 2 partitions with replication >>> factor = 2 >>> >> > - The application also has an associated http service that does >>> >> > interactive queries on the state store >>> >> > >>> >> > The application runs fine when I invoke a single instance. I can >>> use the >>> >> > http endpoints to do queries and everything looks good. Problems >>> surface >>> >> > when I try to spawn another instance of the application. I use the >>> same >>> >> > APPLICATION_ID_CONFIG and the instance starts on a different node >>> of the >>> >> > cluster. The data consumption part works fine as the new instance >>> also >>> >> > starts consuming from the same topic as the first one. But when I >>> try >>> >> the >>> >> > http query, the metadata fetch fails .. >>> >> > >>> >> > I have some code snippet like this as part of the query that tries >>> to >>> >> fetch >>> >> > the metadata so that I can locate the host to query on .. >>> >> > >>> >> > metadataService.streamsMetadataForStoreAndKey(store, hostKey, >>> >> > stringSerializer) match { >>> >> > case Success(host) => { >>> >> > // hostKey is on another instance. call the other instance >>> to >>> >> fetch >>> >> > the data. >>> >> > if (!thisHost(host)) { >>> >> > logger.warn(s"Key $hostKey is on another instance not on >>> >> $host - >>> >> > requerying ..") >>> >> > httpRequester.queryFromHost[Long](host, path) >>> >> > } else { >>> >> > // hostKey is on this instance >>> >> > localStateStoreQuery.queryStateStore(streams, store, >>> hostKey) >>> >> > } >>> >> > } >>> >> > case Failure(ex) => Future.failed(ex) >>> >> > } >>> >> > >>> >> > and the metadataService.streamsMetadataForStoreAndKey has the >>> following >>> >> > call .. >>> >> > >>> >> > streams.metadataForKey(store, key, serializer) match { >>> >> > case null => throw new IllegalArgumentException(s"Metadata >>> for >>> >> key >>> >> > $key not found in $store") >>> >> > case metadata => new HostStoreInfo(metadata.host, >>> metadata.port, >>> >> > metadata.stateStoreNames.asScala.toSet) >>> >> > } >>> >> > >>> >> > When I start the second instance, streams.metadataForKey returns >>> null >>> >> for >>> >> > any key I pass .. Here's the relevant stack trace .. >>> >> > >>> >> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net >>> not >>> >> > found in access-count-per-host >>> >> > at >>> >> > >>> >> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$ >>> >> streamsMetadataForStoreAndKey$1(MetadataService.scala:51) >>> >> > at scala.util.Try$.apply(Try.scala:209) >>> >> > at >>> >> > >>> >> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe >>> >> tadataForStoreAndKey(MetadataService.scala:46) >>> >> > at >>> >> > >>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI >>> >> nfo(KeyValueFetcher.scala:36) >>> >> > at >>> >> > >>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo >>> >> untSummary(KeyValueFetcher.scala:29) >>> >> > at >>> >> > >>> >> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun >>> >> $routes$10(WeblogDSLHttpService.scala:41) >>> >> > at >>> >> > >>> >> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu >>> >> n$complete$1(RouteDirectives.scala:47) >>> >> > at >>> >> > >>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa >>> >> rdRoute.scala:19) >>> >> > at >>> >> > >>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa >>> >> rdRoute.scala:19) >>> >> > ... >>> >> > >>> >> > and following this exception I get another one which looks like an >>> >> internal >>> >> > exception that stops the application .. >>> >> > >>> >> > 09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals. >>> StreamThread >>> >> - >>> >> > stream-thread [StreamThread-1] Streams application error during >>> >> processing: >>> >> > java.lang.IllegalStateException: Attempt to retrieve exception from >>> >> future >>> >> > which hasn't failed >>> >> > at >>> >> > >>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture. >>> >> exception(RequestFuture.java:99) >>> >> > at >>> >> > >>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture. >>> >> isRetriable(RequestFuture.java:89) >>> >> > at >>> >> > >>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590) >>> >> > at >>> >> > >>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( >>> >> KafkaConsumer.java:1124) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamTask. >>> >> commitOffsets(StreamTask.java:296) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamTask$1. >>> >> run(StreamTask.java:79) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI >>> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamTask. >>> >> commit(StreamTask.java:280) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>> >> commitOne(StreamThread.java:807) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>> >> commitAll(StreamThread.java:794) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>> >> maybeCommit(StreamThread.java:769) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>> >> runLoop(StreamThread.java:647) >>> >> > at >>> >> > >>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>> >> run(StreamThread.java:361) >>> >> > >>> >> > BTW the application runs fine when I have 2 instances running on the >>> >> same >>> >> > host (my laptop) or on 1 instance on the cluster. The problem >>> surfaces >>> >> only >>> >> > when I start the second instance. >>> >> > >>> >> > Any help / pointer / area to look into ? >>> >> > >>> >> > -- >>> >> > Debasish Ghosh >>> >> > http://manning.com/ghosh2 >>> >> > http://manning.com/ghosh >>> >> > >>> >> > Twttr: @debasishg >>> >> > Blog: http://debasishg.blogspot.com >>> >> > Code: http://github.com/debasishg >>> >> > >>> >> >>> > >>> > >>> > >>> > -- >>> > Debasish Ghosh >>> > http://manning.com/ghosh2 >>> > http://manning.com/ghosh >>> > >>> > Twttr: @debasishg >>> > Blog: http://debasishg.blogspot.com >>> > Code: http://github.com/debasishg >>> > >>> >>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg