Hi Damien -

I am not sure I understand what u mean .. I have the following set in the
application .. Do I need to set anything else at the host level ?
Environment variable ?

    val streamingConfig = {
      val settings = new Properties
      settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
"kstream-weblog-processing")
      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)

      config.schemaRegistryUrl.foreach{ url =>

settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url)
      }

      settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray.getClass.getName)
      settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String.getClass.getName)

      // setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
      // Note: To re-run the demo, you need to use the offset reset tool:
      //
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
      settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

      // need this for query service
      settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
s"${config.httpInterface}:${config.httpPort}")

      // default is /tmp/kafka-streams
      settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir)

      // Set the commit interval to 500ms so that any changes are flushed
frequently and the summary
      // data are updated with low latency.
      settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")

      settings
    }

Please explain a bit ..

regards.


On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi,
>
> Do you have the application.server property set appropriately for both
> hosts?
>
> The second stack trace is this bug:
> https://issues.apache.org/jira/browse/KAFKA-5556
>
> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
> > Hi -
> >
> > In my Kafka Streams application, I have a state store resulting from a
> > stateful streaming topology. The environment is
> >
> >    - Kafka 0.10.2.1
> >    - It runs on a DC/OS cluster
> >    - I am running Confluent-Kafka 3.2.2 on the cluster
> >    - Each topic that I have has 2 partitions with replication factor = 2
> >    - The application also has an associated http service that does
> >    interactive queries on the state store
> >
> > The application runs fine when I invoke a single instance. I can use the
> > http endpoints to do queries and everything looks good. Problems surface
> > when I try to spawn another instance of the application. I use the same
> > APPLICATION_ID_CONFIG and the instance starts on a different node of the
> > cluster. The data consumption part works fine as the new instance also
> > starts consuming from the same topic as the first one. But when I try the
> > http query, the metadata fetch fails ..
> >
> > I have some code snippet like this as part of the query that tries to
> fetch
> > the metadata so that I can locate the host to query on ..
> >
> >     metadataService.streamsMetadataForStoreAndKey(store, hostKey,
> > stringSerializer) match {
> >       case Success(host) => {
> >         // hostKey is on another instance. call the other instance to
> fetch
> > the data.
> >         if (!thisHost(host)) {
> >           logger.warn(s"Key $hostKey is on another instance not on $host
> -
> > requerying ..")
> >           httpRequester.queryFromHost[Long](host, path)
> >         } else {
> >           // hostKey is on this instance
> >           localStateStoreQuery.queryStateStore(streams, store, hostKey)
> >         }
> >       }
> >       case Failure(ex) => Future.failed(ex)
> >     }
> >
> > and the metadataService.streamsMetadataForStoreAndKey has the following
> > call ..
> >
> >     streams.metadataForKey(store, key, serializer) match {
> >       case null => throw new IllegalArgumentException(s"Metadata for key
> > $key not found in $store")
> >       case metadata => new HostStoreInfo(metadata.host, metadata.port,
> > metadata.stateStoreNames.asScala.toSet)
> >     }
> >
> > When I start the second instance, streams.metadataForKey returns null for
> > any key I pass .. Here's the relevant stack trace ..
> >
> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not
> > found in access-count-per-host
> >         at
> >
> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$
> streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
> >         at scala.util.Try$.apply(Try.scala:209)
> >         at
> >
> > com.xx.fdp.sample.kstream.services.MetadataService.
> streamsMetadataForStoreAndKey(MetadataService.scala:46)
> >         at
> >
> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryInfo(
> KeyValueFetcher.scala:36)
> >         at
> >
> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCountSummary(
> KeyValueFetcher.scala:29)
> >         at
> >
> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun$routes$10(
> WeblogDSLHttpService.scala:41)
> >         at
> >
> > akka.http.scaladsl.server.directives.RouteDirectives.$
> anonfun$complete$1(RouteDirectives.scala:47)
> >         at
> >
> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(
> StandardRoute.scala:19)
> >         at
> >
> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(
> StandardRoute.scala:19)
> >         ...
> >
> > and following this exception I get another one which looks like an
> internal
> > exception that stops the application ..
> >
> > 09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread
> -
> > stream-thread [StreamThread-1] Streams application error during
> processing:
> > java.lang.IllegalStateException: Attempt to retrieve exception from
> future
> > which hasn't failed
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(
> RequestFuture.java:99)
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(
> RequestFuture.java:89)
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> commitOffsetsSync(ConsumerCoordinator.java:590)
> >         at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1124)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(
> StreamTask.java:296)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:79)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:280)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:807)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:794)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:769)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:647)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> >
> > BTW the application runs fine when I have 2 instances running on the same
> > host (my laptop) or on 1 instance on the cluster. The problem surfaces
> only
> > when I start the second instance.
> >
> > Any help / pointer / area to look into ?
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to