The config looks ok to me

On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
> referring to. Just now I noticed that I may also need to set
> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
> Anything else that I may be missing ?
>
>
> regards.
>
> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
> > Hi Damien -
> >
> > I am not sure I understand what u mean .. I have the following set in the
> > application .. Do I need to set anything else at the host level ?
> > Environment variable ?
> >
> >     val streamingConfig = {
> >       val settings = new Properties
> >       settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "kstream-weblog-processing")
> >       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> config.brokers)
> >
> >       config.schemaRegistryUrl.foreach{ url =>
> >
>  settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> > url)
> >       }
> >
> >       settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray.getClass.getName)
> >       settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String.getClass.getName)
> >
> >       // setting offset reset to earliest so that we can re-run the demo
> > code with the same pre-loaded data
> >       // Note: To re-run the demo, you need to use the offset reset tool:
> >       // https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Streams+Application+Reset+Tool
> >       settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> >
> >       // need this for query service
> >       settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
> > s"${config.httpInterface}:${config.httpPort}")
> >
> >       // default is /tmp/kafka-streams
> >       settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir)
> >
> >       // Set the commit interval to 500ms so that any changes are flushed
> > frequently and the summary
> >       // data are updated with low latency.
> >       settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
> >
> >       settings
> >     }
> >
> > Please explain a bit ..
> >
> > regards.
> >
> >
> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> Do you have the application.server property set appropriately for both
> >> hosts?
> >>
> >> The second stack trace is this bug:
> >> https://issues.apache.org/jira/browse/KAFKA-5556
> >>
> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com>
> >> wrote:
> >>
> >> > Hi -
> >> >
> >> > In my Kafka Streams application, I have a state store resulting from a
> >> > stateful streaming topology. The environment is
> >> >
> >> >    - Kafka 0.10.2.1
> >> >    - It runs on a DC/OS cluster
> >> >    - I am running Confluent-Kafka 3.2.2 on the cluster
> >> >    - Each topic that I have has 2 partitions with replication factor
> = 2
> >> >    - The application also has an associated http service that does
> >> >    interactive queries on the state store
> >> >
> >> > The application runs fine when I invoke a single instance. I can use
> the
> >> > http endpoints to do queries and everything looks good. Problems
> surface
> >> > when I try to spawn another instance of the application. I use the
> same
> >> > APPLICATION_ID_CONFIG and the instance starts on a different node of
> the
> >> > cluster. The data consumption part works fine as the new instance also
> >> > starts consuming from the same topic as the first one. But when I try
> >> the
> >> > http query, the metadata fetch fails ..
> >> >
> >> > I have some code snippet like this as part of the query that tries to
> >> fetch
> >> > the metadata so that I can locate the host to query on ..
> >> >
> >> >     metadataService.streamsMetadataForStoreAndKey(store, hostKey,
> >> > stringSerializer) match {
> >> >       case Success(host) => {
> >> >         // hostKey is on another instance. call the other instance to
> >> fetch
> >> > the data.
> >> >         if (!thisHost(host)) {
> >> >           logger.warn(s"Key $hostKey is on another instance not on
> >> $host -
> >> > requerying ..")
> >> >           httpRequester.queryFromHost[Long](host, path)
> >> >         } else {
> >> >           // hostKey is on this instance
> >> >           localStateStoreQuery.queryStateStore(streams, store,
> hostKey)
> >> >         }
> >> >       }
> >> >       case Failure(ex) => Future.failed(ex)
> >> >     }
> >> >
> >> > and the metadataService.streamsMetadataForStoreAndKey has the
> following
> >> > call ..
> >> >
> >> >     streams.metadataForKey(store, key, serializer) match {
> >> >       case null => throw new IllegalArgumentException(s"Metadata for
> >> key
> >> > $key not found in $store")
> >> >       case metadata => new HostStoreInfo(metadata.host, metadata.port,
> >> > metadata.stateStoreNames.asScala.toSet)
> >> >     }
> >> >
> >> > When I start the second instance, streams.metadataForKey returns null
> >> for
> >> > any key I pass .. Here's the relevant stack trace ..
> >> >
> >> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net
> not
> >> > found in access-count-per-host
> >> >         at
> >> >
> >> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$
> >> streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
> >> >         at scala.util.Try$.apply(Try.scala:209)
> >> >         at
> >> >
> >> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe
> >> tadataForStoreAndKey(MetadataService.scala:46)
> >> >         at
> >> >
> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI
> >> nfo(KeyValueFetcher.scala:36)
> >> >         at
> >> >
> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo
> >> untSummary(KeyValueFetcher.scala:29)
> >> >         at
> >> >
> >> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun
> >> $routes$10(WeblogDSLHttpService.scala:41)
> >> >         at
> >> >
> >> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu
> >> n$complete$1(RouteDirectives.scala:47)
> >> >         at
> >> >
> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
> >> rdRoute.scala:19)
> >> >         at
> >> >
> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
> >> rdRoute.scala:19)
> >> >         ...
> >> >
> >> > and following this exception I get another one which looks like an
> >> internal
> >> > exception that stops the application ..
> >> >
> >> > 09:57:33.731 TKD [StreamThread-1] ERROR
> o.a.k.s.p.internals.StreamThread
> >> -
> >> > stream-thread [StreamThread-1] Streams application error during
> >> processing:
> >> > java.lang.IllegalStateException: Attempt to retrieve exception from
> >> future
> >> > which hasn't failed
> >> >         at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
> >> exception(RequestFuture.java:99)
> >> >         at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
> >> isRetriable(RequestFuture.java:89)
> >> >         at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
> >> >         at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> >> KafkaConsumer.java:1124)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamTask.
> >> commitOffsets(StreamTask.java:296)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamTask$1.
> >> run(StreamTask.java:79)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamTask.
> >> commit(StreamTask.java:280)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> commitOne(StreamThread.java:807)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> commitAll(StreamThread.java:794)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> maybeCommit(StreamThread.java:769)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> runLoop(StreamThread.java:647)
> >> >         at
> >> >
> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> run(StreamThread.java:361)
> >> >
> >> > BTW the application runs fine when I have 2 instances running on the
> >> same
> >> > host (my laptop) or on 1 instance on the cluster. The problem surfaces
> >> only
> >> > when I start the second instance.
> >> >
> >> > Any help / pointer / area to look into ?
> >> >
> >> > --
> >> > Debasish Ghosh
> >> > http://manning.com/ghosh2
> >> > http://manning.com/ghosh
> >> >
> >> > Twttr: @debasishg
> >> > Blog: http://debasishg.blogspot.com
> >> > Code: http://github.com/debasishg
> >> >
> >>
> >
> >
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
> >
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to