Hi,

Do you have the application.server property set appropriately for both
hosts?

The second stack trace is this bug:
https://issues.apache.org/jira/browse/KAFKA-5556

On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi -
>
> In my Kafka Streams application, I have a state store resulting from a
> stateful streaming topology. The environment is
>
>    - Kafka 0.10.2.1
>    - It runs on a DC/OS cluster
>    - I am running Confluent-Kafka 3.2.2 on the cluster
>    - Each topic that I have has 2 partitions with replication factor = 2
>    - The application also has an associated http service that does
>    interactive queries on the state store
>
> The application runs fine when I invoke a single instance. I can use the
> http endpoints to do queries and everything looks good. Problems surface
> when I try to spawn another instance of the application. I use the same
> APPLICATION_ID_CONFIG and the instance starts on a different node of the
> cluster. The data consumption part works fine as the new instance also
> starts consuming from the same topic as the first one. But when I try the
> http query, the metadata fetch fails ..
>
> I have some code snippet like this as part of the query that tries to fetch
> the metadata so that I can locate the host to query on ..
>
>     metadataService.streamsMetadataForStoreAndKey(store, hostKey,
> stringSerializer) match {
>       case Success(host) => {
>         // hostKey is on another instance. call the other instance to fetch
> the data.
>         if (!thisHost(host)) {
>           logger.warn(s"Key $hostKey is on another instance not on $host -
> requerying ..")
>           httpRequester.queryFromHost[Long](host, path)
>         } else {
>           // hostKey is on this instance
>           localStateStoreQuery.queryStateStore(streams, store, hostKey)
>         }
>       }
>       case Failure(ex) => Future.failed(ex)
>     }
>
> and the metadataService.streamsMetadataForStoreAndKey has the following
> call ..
>
>     streams.metadataForKey(store, key, serializer) match {
>       case null => throw new IllegalArgumentException(s"Metadata for key
> $key not found in $store")
>       case metadata => new HostStoreInfo(metadata.host, metadata.port,
> metadata.stateStoreNames.asScala.toSet)
>     }
>
> When I start the second instance, streams.metadataForKey returns null for
> any key I pass .. Here's the relevant stack trace ..
>
> java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not
> found in access-count-per-host
>         at
>
> com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
>         at scala.util.Try$.apply(Try.scala:209)
>         at
>
> com.xx.fdp.sample.kstream.services.MetadataService.streamsMetadataForStoreAndKey(MetadataService.scala:46)
>         at
>
> com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryInfo(KeyValueFetcher.scala:36)
>         at
>
> com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCountSummary(KeyValueFetcher.scala:29)
>         at
>
> com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun$routes$10(WeblogDSLHttpService.scala:41)
>         at
>
> akka.http.scaladsl.server.directives.RouteDirectives.$anonfun$complete$1(RouteDirectives.scala:47)
>         at
>
> akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
>         at
>
> akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
>         ...
>
> and following this exception I get another one which looks like an internal
> exception that stops the application ..
>
> 09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread -
> stream-thread [StreamThread-1] Streams application error during processing:
> java.lang.IllegalStateException: Attempt to retrieve exception from future
> which hasn't failed
>         at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
>         at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>
> BTW the application runs fine when I have 2 instances running on the same
> host (my laptop) or on 1 instance on the cluster. The problem surfaces only
> when I start the second instance.
>
> Any help / pointer / area to look into ?
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to