The log file is a huge one. I can send it to you though. Before that let me
confirm one point ..

I set the APPLICATION_SERVER_CONFIG to
s"${config.httpInterface}:${config.httpPort}". In my case the httpInterface
is "0.0.0.0" and the port is set to 7070. Since the two instances start on
different nodes, this should be ok - right ?

regards.

On Fri, Jul 28, 2017 at 8:18 PM, Damian Guy <damian....@gmail.com> wrote:

> Do you have any logs that might help to work out what is going wrong?
>
> On Fri, 28 Jul 2017 at 14:16 Damian Guy <damian....@gmail.com> wrote:
>
>> The config looks ok to me
>>
>> On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
>>> referring to. Just now I noticed that I may also need to set
>>> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
>>> Anything else that I may be missing ?
>>>
>>>
>>> regards.
>>>
>>> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <
>>> ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>> > Hi Damien -
>>> >
>>> > I am not sure I understand what u mean .. I have the following set in
>>> the
>>> > application .. Do I need to set anything else at the host level ?
>>> > Environment variable ?
>>> >
>>> >     val streamingConfig = {
>>> >       val settings = new Properties
>>> >       settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> > "kstream-weblog-processing")
>>> >       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> config.brokers)
>>> >
>>> >       config.schemaRegistryUrl.foreach{ url =>
>>> >         settings.put(AbstractKafkaAvroSerDeConfig.
>>> SCHEMA_REGISTRY_URL_CONFIG,
>>> > url)
>>> >       }
>>> >
>>> >       settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> > Serdes.ByteArray.getClass.getName)
>>> >       settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> > Serdes.String.getClass.getName)
>>> >
>>> >       // setting offset reset to earliest so that we can re-run the
>>> demo
>>> > code with the same pre-loaded data
>>> >       // Note: To re-run the demo, you need to use the offset reset
>>> tool:
>>> >       // https://cwiki.apache.org/confluence/display/KAFKA/
>>> > Kafka+Streams+Application+Reset+Tool
>>> >       settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>> "earliest")
>>> >
>>> >       // need this for query service
>>> >       settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
>>> > s"${config.httpInterface}:${config.httpPort}")
>>> >
>>> >       // default is /tmp/kafka-streams
>>> >       settings.put(StreamsConfig.STATE_DIR_CONFIG,
>>> config.stateStoreDir)
>>> >
>>> >       // Set the commit interval to 500ms so that any changes are
>>> flushed
>>> > frequently and the summary
>>> >       // data are updated with low latency.
>>> >       settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
>>> >
>>> >       settings
>>> >     }
>>> >
>>> > Please explain a bit ..
>>> >
>>> > regards.
>>> >
>>> >
>>> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com>
>>> wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> Do you have the application.server property set appropriately for both
>>> >> hosts?
>>> >>
>>> >> The second stack trace is this bug:
>>> >> https://issues.apache.org/jira/browse/KAFKA-5556
>>> >>
>>> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com
>>> >
>>> >> wrote:
>>> >>
>>> >> > Hi -
>>> >> >
>>> >> > In my Kafka Streams application, I have a state store resulting
>>> from a
>>> >> > stateful streaming topology. The environment is
>>> >> >
>>> >> >    - Kafka 0.10.2.1
>>> >> >    - It runs on a DC/OS cluster
>>> >> >    - I am running Confluent-Kafka 3.2.2 on the cluster
>>> >> >    - Each topic that I have has 2 partitions with replication
>>> factor = 2
>>> >> >    - The application also has an associated http service that does
>>> >> >    interactive queries on the state store
>>> >> >
>>> >> > The application runs fine when I invoke a single instance. I can
>>> use the
>>> >> > http endpoints to do queries and everything looks good. Problems
>>> surface
>>> >> > when I try to spawn another instance of the application. I use the
>>> same
>>> >> > APPLICATION_ID_CONFIG and the instance starts on a different node
>>> of the
>>> >> > cluster. The data consumption part works fine as the new instance
>>> also
>>> >> > starts consuming from the same topic as the first one. But when I
>>> try
>>> >> the
>>> >> > http query, the metadata fetch fails ..
>>> >> >
>>> >> > I have some code snippet like this as part of the query that tries
>>> to
>>> >> fetch
>>> >> > the metadata so that I can locate the host to query on ..
>>> >> >
>>> >> >     metadataService.streamsMetadataForStoreAndKey(store, hostKey,
>>> >> > stringSerializer) match {
>>> >> >       case Success(host) => {
>>> >> >         // hostKey is on another instance. call the other instance
>>> to
>>> >> fetch
>>> >> > the data.
>>> >> >         if (!thisHost(host)) {
>>> >> >           logger.warn(s"Key $hostKey is on another instance not on
>>> >> $host -
>>> >> > requerying ..")
>>> >> >           httpRequester.queryFromHost[Long](host, path)
>>> >> >         } else {
>>> >> >           // hostKey is on this instance
>>> >> >           localStateStoreQuery.queryStateStore(streams, store,
>>> hostKey)
>>> >> >         }
>>> >> >       }
>>> >> >       case Failure(ex) => Future.failed(ex)
>>> >> >     }
>>> >> >
>>> >> > and the metadataService.streamsMetadataForStoreAndKey has the
>>> following
>>> >> > call ..
>>> >> >
>>> >> >     streams.metadataForKey(store, key, serializer) match {
>>> >> >       case null => throw new IllegalArgumentException(s"Metadata
>>> for
>>> >> key
>>> >> > $key not found in $store")
>>> >> >       case metadata => new HostStoreInfo(metadata.host,
>>> metadata.port,
>>> >> > metadata.stateStoreNames.asScala.toSet)
>>> >> >     }
>>> >> >
>>> >> > When I start the second instance, streams.metadataForKey returns
>>> null
>>> >> for
>>> >> > any key I pass .. Here's the relevant stack trace ..
>>> >> >
>>> >> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net
>>> not
>>> >> > found in access-count-per-host
>>> >> >         at
>>> >> >
>>> >> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$
>>> >> streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
>>> >> >         at scala.util.Try$.apply(Try.scala:209)
>>> >> >         at
>>> >> >
>>> >> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe
>>> >> tadataForStoreAndKey(MetadataService.scala:46)
>>> >> >         at
>>> >> >
>>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI
>>> >> nfo(KeyValueFetcher.scala:36)
>>> >> >         at
>>> >> >
>>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo
>>> >> untSummary(KeyValueFetcher.scala:29)
>>> >> >         at
>>> >> >
>>> >> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun
>>> >> $routes$10(WeblogDSLHttpService.scala:41)
>>> >> >         at
>>> >> >
>>> >> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu
>>> >> n$complete$1(RouteDirectives.scala:47)
>>> >> >         at
>>> >> >
>>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
>>> >> rdRoute.scala:19)
>>> >> >         at
>>> >> >
>>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
>>> >> rdRoute.scala:19)
>>> >> >         ...
>>> >> >
>>> >> > and following this exception I get another one which looks like an
>>> >> internal
>>> >> > exception that stops the application ..
>>> >> >
>>> >> > 09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals.
>>> StreamThread
>>> >> -
>>> >> > stream-thread [StreamThread-1] Streams application error during
>>> >> processing:
>>> >> > java.lang.IllegalStateException: Attempt to retrieve exception from
>>> >> future
>>> >> > which hasn't failed
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>>> >> exception(RequestFuture.java:99)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>>> >> isRetriable(RequestFuture.java:89)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>>> >> KafkaConsumer.java:1124)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamTask.
>>> >> commitOffsets(StreamTask.java:296)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamTask$1.
>>> >> run(StreamTask.java:79)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>>> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamTask.
>>> >> commit(StreamTask.java:280)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>> >> commitOne(StreamThread.java:807)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>> >> commitAll(StreamThread.java:794)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>> >> maybeCommit(StreamThread.java:769)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>> >> runLoop(StreamThread.java:647)
>>> >> >         at
>>> >> >
>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>> >> run(StreamThread.java:361)
>>> >> >
>>> >> > BTW the application runs fine when I have 2 instances running on the
>>> >> same
>>> >> > host (my laptop) or on 1 instance on the cluster. The problem
>>> surfaces
>>> >> only
>>> >> > when I start the second instance.
>>> >> >
>>> >> > Any help / pointer / area to look into ?
>>> >> >
>>> >> > --
>>> >> > Debasish Ghosh
>>> >> > http://manning.com/ghosh2
>>> >> > http://manning.com/ghosh
>>> >> >
>>> >> > Twttr: @debasishg
>>> >> > Blog: http://debasishg.blogspot.com
>>> >> > Code: http://github.com/debasishg
>>> >> >
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Debasish Ghosh
>>> > http://manning.com/ghosh2
>>> > http://manning.com/ghosh
>>> >
>>> > Twttr: @debasishg
>>> > Blog: http://debasishg.blogspot.com
>>> > Code: http://github.com/debasishg
>>> >
>>>
>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to