The config looks ok to me On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh <ghosh.debas...@gmail.com> wrote:
> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r > referring to. Just now I noticed that I may also need to set > REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1). > Anything else that I may be missing ? > > > regards. > > On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > > > Hi Damien - > > > > I am not sure I understand what u mean .. I have the following set in the > > application .. Do I need to set anything else at the host level ? > > Environment variable ? > > > > val streamingConfig = { > > val settings = new Properties > > settings.put(StreamsConfig.APPLICATION_ID_CONFIG, > > "kstream-weblog-processing") > > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > config.brokers) > > > > config.schemaRegistryUrl.foreach{ url => > > > settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > > url) > > } > > > > settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > Serdes.ByteArray.getClass.getName) > > settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > Serdes.String.getClass.getName) > > > > // setting offset reset to earliest so that we can re-run the demo > > code with the same pre-loaded data > > // Note: To re-run the demo, you need to use the offset reset tool: > > // https://cwiki.apache.org/confluence/display/KAFKA/ > > Kafka+Streams+Application+Reset+Tool > > settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") > > > > // need this for query service > > settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, > > s"${config.httpInterface}:${config.httpPort}") > > > > // default is /tmp/kafka-streams > > settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir) > > > > // Set the commit interval to 500ms so that any changes are flushed > > frequently and the summary > > // data are updated with low latency. > > settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500") > > > > settings > > } > > > > Please explain a bit .. > > > > regards. > > > > > > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com> > wrote: > > > >> Hi, > >> > >> Do you have the application.server property set appropriately for both > >> hosts? > >> > >> The second stack trace is this bug: > >> https://issues.apache.org/jira/browse/KAFKA-5556 > >> > >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com> > >> wrote: > >> > >> > Hi - > >> > > >> > In my Kafka Streams application, I have a state store resulting from a > >> > stateful streaming topology. The environment is > >> > > >> > - Kafka 0.10.2.1 > >> > - It runs on a DC/OS cluster > >> > - I am running Confluent-Kafka 3.2.2 on the cluster > >> > - Each topic that I have has 2 partitions with replication factor > = 2 > >> > - The application also has an associated http service that does > >> > interactive queries on the state store > >> > > >> > The application runs fine when I invoke a single instance. I can use > the > >> > http endpoints to do queries and everything looks good. Problems > surface > >> > when I try to spawn another instance of the application. I use the > same > >> > APPLICATION_ID_CONFIG and the instance starts on a different node of > the > >> > cluster. The data consumption part works fine as the new instance also > >> > starts consuming from the same topic as the first one. But when I try > >> the > >> > http query, the metadata fetch fails .. > >> > > >> > I have some code snippet like this as part of the query that tries to > >> fetch > >> > the metadata so that I can locate the host to query on .. > >> > > >> > metadataService.streamsMetadataForStoreAndKey(store, hostKey, > >> > stringSerializer) match { > >> > case Success(host) => { > >> > // hostKey is on another instance. call the other instance to > >> fetch > >> > the data. > >> > if (!thisHost(host)) { > >> > logger.warn(s"Key $hostKey is on another instance not on > >> $host - > >> > requerying ..") > >> > httpRequester.queryFromHost[Long](host, path) > >> > } else { > >> > // hostKey is on this instance > >> > localStateStoreQuery.queryStateStore(streams, store, > hostKey) > >> > } > >> > } > >> > case Failure(ex) => Future.failed(ex) > >> > } > >> > > >> > and the metadataService.streamsMetadataForStoreAndKey has the > following > >> > call .. > >> > > >> > streams.metadataForKey(store, key, serializer) match { > >> > case null => throw new IllegalArgumentException(s"Metadata for > >> key > >> > $key not found in $store") > >> > case metadata => new HostStoreInfo(metadata.host, metadata.port, > >> > metadata.stateStoreNames.asScala.toSet) > >> > } > >> > > >> > When I start the second instance, streams.metadataForKey returns null > >> for > >> > any key I pass .. Here's the relevant stack trace .. > >> > > >> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net > not > >> > found in access-count-per-host > >> > at > >> > > >> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$ > >> streamsMetadataForStoreAndKey$1(MetadataService.scala:51) > >> > at scala.util.Try$.apply(Try.scala:209) > >> > at > >> > > >> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe > >> tadataForStoreAndKey(MetadataService.scala:46) > >> > at > >> > > >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI > >> nfo(KeyValueFetcher.scala:36) > >> > at > >> > > >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo > >> untSummary(KeyValueFetcher.scala:29) > >> > at > >> > > >> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun > >> $routes$10(WeblogDSLHttpService.scala:41) > >> > at > >> > > >> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu > >> n$complete$1(RouteDirectives.scala:47) > >> > at > >> > > >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa > >> rdRoute.scala:19) > >> > at > >> > > >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa > >> rdRoute.scala:19) > >> > ... > >> > > >> > and following this exception I get another one which looks like an > >> internal > >> > exception that stops the application .. > >> > > >> > 09:57:33.731 TKD [StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread > >> - > >> > stream-thread [StreamThread-1] Streams application error during > >> processing: > >> > java.lang.IllegalStateException: Attempt to retrieve exception from > >> future > >> > which hasn't failed > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.RequestFuture. > >> exception(RequestFuture.java:99) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.RequestFuture. > >> isRetriable(RequestFuture.java:89) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> tor.commitOffsetsSync(ConsumerCoordinator.java:590) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > >> KafkaConsumer.java:1124) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask. > >> commitOffsets(StreamTask.java:296) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask$1. > >> run(StreamTask.java:79) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI > >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamTask. > >> commit(StreamTask.java:280) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> commitOne(StreamThread.java:807) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> commitAll(StreamThread.java:794) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> maybeCommit(StreamThread.java:769) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> runLoop(StreamThread.java:647) > >> > at > >> > > >> > org.apache.kafka.streams.processor.internals.StreamThread. > >> run(StreamThread.java:361) > >> > > >> > BTW the application runs fine when I have 2 instances running on the > >> same > >> > host (my laptop) or on 1 instance on the cluster. The problem surfaces > >> only > >> > when I start the second instance. > >> > > >> > Any help / pointer / area to look into ? > >> > > >> > -- > >> > Debasish Ghosh > >> > http://manning.com/ghosh2 > >> > http://manning.com/ghosh > >> > > >> > Twttr: @debasishg > >> > Blog: http://debasishg.blogspot.com > >> > Code: http://github.com/debasishg > >> > > >> > > > > > > > > -- > > Debasish Ghosh > > http://manning.com/ghosh2 > > http://manning.com/ghosh > > > > Twttr: @debasishg > > Blog: http://debasishg.blogspot.com > > Code: http://github.com/debasishg > > > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >