Hmmm, i'm not sure that is going to work as both nodes will have the same setting for StreamsConfig.APPLICATION_SERVER_PORT, i.e, 0.0.0.0:7070
On Fri, 28 Jul 2017 at 16:02 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > The log file is a huge one. I can send it to you though. Before that let > me confirm one point .. > > I set the APPLICATION_SERVER_CONFIG to > s"${config.httpInterface}:${config.httpPort}". In my case the > httpInterface is "0.0.0.0" and the port is set to 7070. Since the two > instances start on different nodes, this should be ok - right ? > > regards. > > On Fri, Jul 28, 2017 at 8:18 PM, Damian Guy <damian....@gmail.com> wrote: > >> Do you have any logs that might help to work out what is going wrong? >> >> On Fri, 28 Jul 2017 at 14:16 Damian Guy <damian....@gmail.com> wrote: >> >>> The config looks ok to me >>> >>> On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r >>>> referring to. Just now I noticed that I may also need to set >>>> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1). >>>> Anything else that I may be missing ? >>>> >>>> >>>> regards. >>>> >>>> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh < >>>> ghosh.debas...@gmail.com> >>>> wrote: >>>> >>>> > Hi Damien - >>>> > >>>> > I am not sure I understand what u mean .. I have the following set in >>>> the >>>> > application .. Do I need to set anything else at the host level ? >>>> > Environment variable ? >>>> > >>>> > val streamingConfig = { >>>> > val settings = new Properties >>>> > settings.put(StreamsConfig.APPLICATION_ID_CONFIG, >>>> > "kstream-weblog-processing") >>>> > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>>> config.brokers) >>>> > >>>> > config.schemaRegistryUrl.foreach{ url => >>>> > >>>> settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>> > url) >>>> > } >>>> > >>>> > settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>>> > Serdes.ByteArray.getClass.getName) >>>> > settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>>> > Serdes.String.getClass.getName) >>>> > >>>> > // setting offset reset to earliest so that we can re-run the >>>> demo >>>> > code with the same pre-loaded data >>>> > // Note: To re-run the demo, you need to use the offset reset >>>> tool: >>>> > // https://cwiki.apache.org/confluence/display/KAFKA/ >>>> > Kafka+Streams+Application+Reset+Tool >>>> > settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>>> "earliest") >>>> > >>>> > // need this for query service >>>> > settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, >>>> > s"${config.httpInterface}:${config.httpPort}") >>>> > >>>> > // default is /tmp/kafka-streams >>>> > settings.put(StreamsConfig.STATE_DIR_CONFIG, >>>> config.stateStoreDir) >>>> > >>>> > // Set the commit interval to 500ms so that any changes are >>>> flushed >>>> > frequently and the summary >>>> > // data are updated with low latency. >>>> > settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500") >>>> > >>>> > settings >>>> > } >>>> > >>>> > Please explain a bit .. >>>> > >>>> > regards. >>>> > >>>> > >>>> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com> >>>> wrote: >>>> > >>>> >> Hi, >>>> >> >>>> >> Do you have the application.server property set appropriately for >>>> both >>>> >> hosts? >>>> >> >>>> >> The second stack trace is this bug: >>>> >> https://issues.apache.org/jira/browse/KAFKA-5556 >>>> >> >>>> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh < >>>> ghosh.debas...@gmail.com> >>>> >> wrote: >>>> >> >>>> >> > Hi - >>>> >> > >>>> >> > In my Kafka Streams application, I have a state store resulting >>>> from a >>>> >> > stateful streaming topology. The environment is >>>> >> > >>>> >> > - Kafka 0.10.2.1 >>>> >> > - It runs on a DC/OS cluster >>>> >> > - I am running Confluent-Kafka 3.2.2 on the cluster >>>> >> > - Each topic that I have has 2 partitions with replication >>>> factor = 2 >>>> >> > - The application also has an associated http service that does >>>> >> > interactive queries on the state store >>>> >> > >>>> >> > The application runs fine when I invoke a single instance. I can >>>> use the >>>> >> > http endpoints to do queries and everything looks good. Problems >>>> surface >>>> >> > when I try to spawn another instance of the application. I use the >>>> same >>>> >> > APPLICATION_ID_CONFIG and the instance starts on a different node >>>> of the >>>> >> > cluster. The data consumption part works fine as the new instance >>>> also >>>> >> > starts consuming from the same topic as the first one. But when I >>>> try >>>> >> the >>>> >> > http query, the metadata fetch fails .. >>>> >> > >>>> >> > I have some code snippet like this as part of the query that tries >>>> to >>>> >> fetch >>>> >> > the metadata so that I can locate the host to query on .. >>>> >> > >>>> >> > metadataService.streamsMetadataForStoreAndKey(store, hostKey, >>>> >> > stringSerializer) match { >>>> >> > case Success(host) => { >>>> >> > // hostKey is on another instance. call the other instance >>>> to >>>> >> fetch >>>> >> > the data. >>>> >> > if (!thisHost(host)) { >>>> >> > logger.warn(s"Key $hostKey is on another instance not on >>>> >> $host - >>>> >> > requerying ..") >>>> >> > httpRequester.queryFromHost[Long](host, path) >>>> >> > } else { >>>> >> > // hostKey is on this instance >>>> >> > localStateStoreQuery.queryStateStore(streams, store, >>>> hostKey) >>>> >> > } >>>> >> > } >>>> >> > case Failure(ex) => Future.failed(ex) >>>> >> > } >>>> >> > >>>> >> > and the metadataService.streamsMetadataForStoreAndKey has the >>>> following >>>> >> > call .. >>>> >> > >>>> >> > streams.metadataForKey(store, key, serializer) match { >>>> >> > case null => throw new IllegalArgumentException(s"Metadata >>>> for >>>> >> key >>>> >> > $key not found in $store") >>>> >> > case metadata => new HostStoreInfo(metadata.host, >>>> metadata.port, >>>> >> > metadata.stateStoreNames.asScala.toSet) >>>> >> > } >>>> >> > >>>> >> > When I start the second instance, streams.metadataForKey returns >>>> null >>>> >> for >>>> >> > any key I pass .. Here's the relevant stack trace .. >>>> >> > >>>> >> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net >>>> not >>>> >> > found in access-count-per-host >>>> >> > at >>>> >> > >>>> >> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$ >>>> >> streamsMetadataForStoreAndKey$1(MetadataService.scala:51) >>>> >> > at scala.util.Try$.apply(Try.scala:209) >>>> >> > at >>>> >> > >>>> >> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe >>>> >> tadataForStoreAndKey(MetadataService.scala:46) >>>> >> > at >>>> >> > >>>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI >>>> >> nfo(KeyValueFetcher.scala:36) >>>> >> > at >>>> >> > >>>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo >>>> >> untSummary(KeyValueFetcher.scala:29) >>>> >> > at >>>> >> > >>>> >> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun >>>> >> $routes$10(WeblogDSLHttpService.scala:41) >>>> >> > at >>>> >> > >>>> >> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu >>>> >> n$complete$1(RouteDirectives.scala:47) >>>> >> > at >>>> >> > >>>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa >>>> >> rdRoute.scala:19) >>>> >> > at >>>> >> > >>>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa >>>> >> rdRoute.scala:19) >>>> >> > ... >>>> >> > >>>> >> > and following this exception I get another one which looks like an >>>> >> internal >>>> >> > exception that stops the application .. >>>> >> > >>>> >> > 09:57:33.731 TKD [StreamThread-1] ERROR >>>> o.a.k.s.p.internals.StreamThread >>>> >> - >>>> >> > stream-thread [StreamThread-1] Streams application error during >>>> >> processing: >>>> >> > java.lang.IllegalStateException: Attempt to retrieve exception from >>>> >> future >>>> >> > which hasn't failed >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture. >>>> >> exception(RequestFuture.java:99) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture. >>>> >> isRetriable(RequestFuture.java:89) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( >>>> >> KafkaConsumer.java:1124) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamTask. >>>> >> commitOffsets(StreamTask.java:296) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamTask$1. >>>> >> run(StreamTask.java:79) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI >>>> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamTask. >>>> >> commit(StreamTask.java:280) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>>> >> commitOne(StreamThread.java:807) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>>> >> commitAll(StreamThread.java:794) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>>> >> maybeCommit(StreamThread.java:769) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>>> >> runLoop(StreamThread.java:647) >>>> >> > at >>>> >> > >>>> >> > org.apache.kafka.streams.processor.internals.StreamThread. >>>> >> run(StreamThread.java:361) >>>> >> > >>>> >> > BTW the application runs fine when I have 2 instances running on >>>> the >>>> >> same >>>> >> > host (my laptop) or on 1 instance on the cluster. The problem >>>> surfaces >>>> >> only >>>> >> > when I start the second instance. >>>> >> > >>>> >> > Any help / pointer / area to look into ? >>>> >> > >>>> >> > -- >>>> >> > Debasish Ghosh >>>> >> > http://manning.com/ghosh2 >>>> >> > http://manning.com/ghosh >>>> >> > >>>> >> > Twttr: @debasishg >>>> >> > Blog: http://debasishg.blogspot.com >>>> >> > Code: http://github.com/debasishg >>>> >> > >>>> >> >>>> > >>>> > >>>> > >>>> > -- >>>> > Debasish Ghosh >>>> > http://manning.com/ghosh2 >>>> > http://manning.com/ghosh >>>> > >>>> > Twttr: @debasishg >>>> > Blog: http://debasishg.blogspot.com >>>> > Code: http://github.com/debasishg >>>> > >>>> >>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >