Hmmm, i'm not sure that is going to work as both nodes will have the same
setting for StreamsConfig.APPLICATION_SERVER_PORT, i.e, 0.0.0.0:7070

On Fri, 28 Jul 2017 at 16:02 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> The log file is a huge one. I can send it to you though. Before that let
> me confirm one point ..
>
> I set the APPLICATION_SERVER_CONFIG to
> s"${config.httpInterface}:${config.httpPort}". In my case the
> httpInterface is "0.0.0.0" and the port is set to 7070. Since the two
> instances start on different nodes, this should be ok - right ?
>
> regards.
>
> On Fri, Jul 28, 2017 at 8:18 PM, Damian Guy <damian....@gmail.com> wrote:
>
>> Do you have any logs that might help to work out what is going wrong?
>>
>> On Fri, 28 Jul 2017 at 14:16 Damian Guy <damian....@gmail.com> wrote:
>>
>>> The config looks ok to me
>>>
>>> On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh <ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
>>>> referring to. Just now I noticed that I may also need to set
>>>> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
>>>> Anything else that I may be missing ?
>>>>
>>>>
>>>> regards.
>>>>
>>>> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <
>>>> ghosh.debas...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi Damien -
>>>> >
>>>> > I am not sure I understand what u mean .. I have the following set in
>>>> the
>>>> > application .. Do I need to set anything else at the host level ?
>>>> > Environment variable ?
>>>> >
>>>> >     val streamingConfig = {
>>>> >       val settings = new Properties
>>>> >       settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>> > "kstream-weblog-processing")
>>>> >       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>> config.brokers)
>>>> >
>>>> >       config.schemaRegistryUrl.foreach{ url =>
>>>> >
>>>>  settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>> > url)
>>>> >       }
>>>> >
>>>> >       settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>> > Serdes.ByteArray.getClass.getName)
>>>> >       settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>> > Serdes.String.getClass.getName)
>>>> >
>>>> >       // setting offset reset to earliest so that we can re-run the
>>>> demo
>>>> > code with the same pre-loaded data
>>>> >       // Note: To re-run the demo, you need to use the offset reset
>>>> tool:
>>>> >       // https://cwiki.apache.org/confluence/display/KAFKA/
>>>> > Kafka+Streams+Application+Reset+Tool
>>>> >       settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>>> "earliest")
>>>> >
>>>> >       // need this for query service
>>>> >       settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
>>>> > s"${config.httpInterface}:${config.httpPort}")
>>>> >
>>>> >       // default is /tmp/kafka-streams
>>>> >       settings.put(StreamsConfig.STATE_DIR_CONFIG,
>>>> config.stateStoreDir)
>>>> >
>>>> >       // Set the commit interval to 500ms so that any changes are
>>>> flushed
>>>> > frequently and the summary
>>>> >       // data are updated with low latency.
>>>> >       settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
>>>> >
>>>> >       settings
>>>> >     }
>>>> >
>>>> > Please explain a bit ..
>>>> >
>>>> > regards.
>>>> >
>>>> >
>>>> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com>
>>>> wrote:
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> Do you have the application.server property set appropriately for
>>>> both
>>>> >> hosts?
>>>> >>
>>>> >> The second stack trace is this bug:
>>>> >> https://issues.apache.org/jira/browse/KAFKA-5556
>>>> >>
>>>> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <
>>>> ghosh.debas...@gmail.com>
>>>> >> wrote:
>>>> >>
>>>> >> > Hi -
>>>> >> >
>>>> >> > In my Kafka Streams application, I have a state store resulting
>>>> from a
>>>> >> > stateful streaming topology. The environment is
>>>> >> >
>>>> >> >    - Kafka 0.10.2.1
>>>> >> >    - It runs on a DC/OS cluster
>>>> >> >    - I am running Confluent-Kafka 3.2.2 on the cluster
>>>> >> >    - Each topic that I have has 2 partitions with replication
>>>> factor = 2
>>>> >> >    - The application also has an associated http service that does
>>>> >> >    interactive queries on the state store
>>>> >> >
>>>> >> > The application runs fine when I invoke a single instance. I can
>>>> use the
>>>> >> > http endpoints to do queries and everything looks good. Problems
>>>> surface
>>>> >> > when I try to spawn another instance of the application. I use the
>>>> same
>>>> >> > APPLICATION_ID_CONFIG and the instance starts on a different node
>>>> of the
>>>> >> > cluster. The data consumption part works fine as the new instance
>>>> also
>>>> >> > starts consuming from the same topic as the first one. But when I
>>>> try
>>>> >> the
>>>> >> > http query, the metadata fetch fails ..
>>>> >> >
>>>> >> > I have some code snippet like this as part of the query that tries
>>>> to
>>>> >> fetch
>>>> >> > the metadata so that I can locate the host to query on ..
>>>> >> >
>>>> >> >     metadataService.streamsMetadataForStoreAndKey(store, hostKey,
>>>> >> > stringSerializer) match {
>>>> >> >       case Success(host) => {
>>>> >> >         // hostKey is on another instance. call the other instance
>>>> to
>>>> >> fetch
>>>> >> > the data.
>>>> >> >         if (!thisHost(host)) {
>>>> >> >           logger.warn(s"Key $hostKey is on another instance not on
>>>> >> $host -
>>>> >> > requerying ..")
>>>> >> >           httpRequester.queryFromHost[Long](host, path)
>>>> >> >         } else {
>>>> >> >           // hostKey is on this instance
>>>> >> >           localStateStoreQuery.queryStateStore(streams, store,
>>>> hostKey)
>>>> >> >         }
>>>> >> >       }
>>>> >> >       case Failure(ex) => Future.failed(ex)
>>>> >> >     }
>>>> >> >
>>>> >> > and the metadataService.streamsMetadataForStoreAndKey has the
>>>> following
>>>> >> > call ..
>>>> >> >
>>>> >> >     streams.metadataForKey(store, key, serializer) match {
>>>> >> >       case null => throw new IllegalArgumentException(s"Metadata
>>>> for
>>>> >> key
>>>> >> > $key not found in $store")
>>>> >> >       case metadata => new HostStoreInfo(metadata.host,
>>>> metadata.port,
>>>> >> > metadata.stateStoreNames.asScala.toSet)
>>>> >> >     }
>>>> >> >
>>>> >> > When I start the second instance, streams.metadataForKey returns
>>>> null
>>>> >> for
>>>> >> > any key I pass .. Here's the relevant stack trace ..
>>>> >> >
>>>> >> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net
>>>> not
>>>> >> > found in access-count-per-host
>>>> >> >         at
>>>> >> >
>>>> >> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$
>>>> >> streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
>>>> >> >         at scala.util.Try$.apply(Try.scala:209)
>>>> >> >         at
>>>> >> >
>>>> >> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe
>>>> >> tadataForStoreAndKey(MetadataService.scala:46)
>>>> >> >         at
>>>> >> >
>>>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI
>>>> >> nfo(KeyValueFetcher.scala:36)
>>>> >> >         at
>>>> >> >
>>>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo
>>>> >> untSummary(KeyValueFetcher.scala:29)
>>>> >> >         at
>>>> >> >
>>>> >> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun
>>>> >> $routes$10(WeblogDSLHttpService.scala:41)
>>>> >> >         at
>>>> >> >
>>>> >> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu
>>>> >> n$complete$1(RouteDirectives.scala:47)
>>>> >> >         at
>>>> >> >
>>>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
>>>> >> rdRoute.scala:19)
>>>> >> >         at
>>>> >> >
>>>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
>>>> >> rdRoute.scala:19)
>>>> >> >         ...
>>>> >> >
>>>> >> > and following this exception I get another one which looks like an
>>>> >> internal
>>>> >> > exception that stops the application ..
>>>> >> >
>>>> >> > 09:57:33.731 TKD [StreamThread-1] ERROR
>>>> o.a.k.s.p.internals.StreamThread
>>>> >> -
>>>> >> > stream-thread [StreamThread-1] Streams application error during
>>>> >> processing:
>>>> >> > java.lang.IllegalStateException: Attempt to retrieve exception from
>>>> >> future
>>>> >> > which hasn't failed
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>>>> >> exception(RequestFuture.java:99)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>>>> >> isRetriable(RequestFuture.java:89)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>>>> >> KafkaConsumer.java:1124)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamTask.
>>>> >> commitOffsets(StreamTask.java:296)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamTask$1.
>>>> >> run(StreamTask.java:79)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>>>> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamTask.
>>>> >> commit(StreamTask.java:280)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>>> >> commitOne(StreamThread.java:807)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>>> >> commitAll(StreamThread.java:794)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>>> >> maybeCommit(StreamThread.java:769)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>>> >> runLoop(StreamThread.java:647)
>>>> >> >         at
>>>> >> >
>>>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>>>> >> run(StreamThread.java:361)
>>>> >> >
>>>> >> > BTW the application runs fine when I have 2 instances running on
>>>> the
>>>> >> same
>>>> >> > host (my laptop) or on 1 instance on the cluster. The problem
>>>> surfaces
>>>> >> only
>>>> >> > when I start the second instance.
>>>> >> >
>>>> >> > Any help / pointer / area to look into ?
>>>> >> >
>>>> >> > --
>>>> >> > Debasish Ghosh
>>>> >> > http://manning.com/ghosh2
>>>> >> > http://manning.com/ghosh
>>>> >> >
>>>> >> > Twttr: @debasishg
>>>> >> > Blog: http://debasishg.blogspot.com
>>>> >> > Code: http://github.com/debasishg
>>>> >> >
>>>> >>
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > Debasish Ghosh
>>>> > http://manning.com/ghosh2
>>>> > http://manning.com/ghosh
>>>> >
>>>> > Twttr: @debasishg
>>>> > Blog: http://debasishg.blogspot.com
>>>> > Code: http://github.com/debasishg
>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to