I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
referring to. Just now I noticed that I may also need to set
REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
Anything else that I may be missing ?


regards.

On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi Damien -
>
> I am not sure I understand what u mean .. I have the following set in the
> application .. Do I need to set anything else at the host level ?
> Environment variable ?
>
>     val streamingConfig = {
>       val settings = new Properties
>       settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "kstream-weblog-processing")
>       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
>
>       config.schemaRegistryUrl.foreach{ url =>
>         settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> url)
>       }
>
>       settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.ByteArray.getClass.getName)
>       settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String.getClass.getName)
>
>       // setting offset reset to earliest so that we can re-run the demo
> code with the same pre-loaded data
>       // Note: To re-run the demo, you need to use the offset reset tool:
>       // https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Application+Reset+Tool
>       settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>
>       // need this for query service
>       settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
> s"${config.httpInterface}:${config.httpPort}")
>
>       // default is /tmp/kafka-streams
>       settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir)
>
>       // Set the commit interval to 500ms so that any changes are flushed
> frequently and the summary
>       // data are updated with low latency.
>       settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
>
>       settings
>     }
>
> Please explain a bit ..
>
> regards.
>
>
> On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com> wrote:
>
>> Hi,
>>
>> Do you have the application.server property set appropriately for both
>> hosts?
>>
>> The second stack trace is this bug:
>> https://issues.apache.org/jira/browse/KAFKA-5556
>>
>> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>> > Hi -
>> >
>> > In my Kafka Streams application, I have a state store resulting from a
>> > stateful streaming topology. The environment is
>> >
>> >    - Kafka 0.10.2.1
>> >    - It runs on a DC/OS cluster
>> >    - I am running Confluent-Kafka 3.2.2 on the cluster
>> >    - Each topic that I have has 2 partitions with replication factor = 2
>> >    - The application also has an associated http service that does
>> >    interactive queries on the state store
>> >
>> > The application runs fine when I invoke a single instance. I can use the
>> > http endpoints to do queries and everything looks good. Problems surface
>> > when I try to spawn another instance of the application. I use the same
>> > APPLICATION_ID_CONFIG and the instance starts on a different node of the
>> > cluster. The data consumption part works fine as the new instance also
>> > starts consuming from the same topic as the first one. But when I try
>> the
>> > http query, the metadata fetch fails ..
>> >
>> > I have some code snippet like this as part of the query that tries to
>> fetch
>> > the metadata so that I can locate the host to query on ..
>> >
>> >     metadataService.streamsMetadataForStoreAndKey(store, hostKey,
>> > stringSerializer) match {
>> >       case Success(host) => {
>> >         // hostKey is on another instance. call the other instance to
>> fetch
>> > the data.
>> >         if (!thisHost(host)) {
>> >           logger.warn(s"Key $hostKey is on another instance not on
>> $host -
>> > requerying ..")
>> >           httpRequester.queryFromHost[Long](host, path)
>> >         } else {
>> >           // hostKey is on this instance
>> >           localStateStoreQuery.queryStateStore(streams, store, hostKey)
>> >         }
>> >       }
>> >       case Failure(ex) => Future.failed(ex)
>> >     }
>> >
>> > and the metadataService.streamsMetadataForStoreAndKey has the following
>> > call ..
>> >
>> >     streams.metadataForKey(store, key, serializer) match {
>> >       case null => throw new IllegalArgumentException(s"Metadata for
>> key
>> > $key not found in $store")
>> >       case metadata => new HostStoreInfo(metadata.host, metadata.port,
>> > metadata.stateStoreNames.asScala.toSet)
>> >     }
>> >
>> > When I start the second instance, streams.metadataForKey returns null
>> for
>> > any key I pass .. Here's the relevant stack trace ..
>> >
>> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not
>> > found in access-count-per-host
>> >         at
>> >
>> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$
>> streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
>> >         at scala.util.Try$.apply(Try.scala:209)
>> >         at
>> >
>> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe
>> tadataForStoreAndKey(MetadataService.scala:46)
>> >         at
>> >
>> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI
>> nfo(KeyValueFetcher.scala:36)
>> >         at
>> >
>> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo
>> untSummary(KeyValueFetcher.scala:29)
>> >         at
>> >
>> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun
>> $routes$10(WeblogDSLHttpService.scala:41)
>> >         at
>> >
>> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu
>> n$complete$1(RouteDirectives.scala:47)
>> >         at
>> >
>> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
>> rdRoute.scala:19)
>> >         at
>> >
>> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
>> rdRoute.scala:19)
>> >         ...
>> >
>> > and following this exception I get another one which looks like an
>> internal
>> > exception that stops the application ..
>> >
>> > 09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread
>> -
>> > stream-thread [StreamThread-1] Streams application error during
>> processing:
>> > java.lang.IllegalStateException: Attempt to retrieve exception from
>> future
>> > which hasn't failed
>> >         at
>> >
>> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>> exception(RequestFuture.java:99)
>> >         at
>> >
>> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>> isRetriable(RequestFuture.java:89)
>> >         at
>> >
>> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
>> >         at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>> KafkaConsumer.java:1124)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamTask.
>> commitOffsets(StreamTask.java:296)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamTask$1.
>> run(StreamTask.java:79)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamTask.
>> commit(StreamTask.java:280)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> commitOne(StreamThread.java:807)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> commitAll(StreamThread.java:794)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> maybeCommit(StreamThread.java:769)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> runLoop(StreamThread.java:647)
>> >         at
>> >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> run(StreamThread.java:361)
>> >
>> > BTW the application runs fine when I have 2 instances running on the
>> same
>> > host (my laptop) or on 1 instance on the cluster. The problem surfaces
>> only
>> > when I start the second instance.
>> >
>> > Any help / pointer / area to look into ?
>> >
>> > --
>> > Debasish Ghosh
>> > http://manning.com/ghosh2
>> > http://manning.com/ghosh
>> >
>> > Twttr: @debasishg
>> > Blog: http://debasishg.blogspot.com
>> > Code: http://github.com/debasishg
>> >
>>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to