Do you have any logs that might help to work out what is going wrong?

On Fri, 28 Jul 2017 at 14:16 Damian Guy <damian....@gmail.com> wrote:

> The config looks ok to me
>
> On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
>> referring to. Just now I noticed that I may also need to set
>> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
>> Anything else that I may be missing ?
>>
>>
>> regards.
>>
>> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <ghosh.debas...@gmail.com
>> >
>> wrote:
>>
>> > Hi Damien -
>> >
>> > I am not sure I understand what u mean .. I have the following set in
>> the
>> > application .. Do I need to set anything else at the host level ?
>> > Environment variable ?
>> >
>> >     val streamingConfig = {
>> >       val settings = new Properties
>> >       settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> > "kstream-weblog-processing")
>> >       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> config.brokers)
>> >
>> >       config.schemaRegistryUrl.foreach{ url =>
>> >
>>  settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> > url)
>> >       }
>> >
>> >       settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > Serdes.ByteArray.getClass.getName)
>> >       settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String.getClass.getName)
>> >
>> >       // setting offset reset to earliest so that we can re-run the demo
>> > code with the same pre-loaded data
>> >       // Note: To re-run the demo, you need to use the offset reset
>> tool:
>> >       // https://cwiki.apache.org/confluence/display/KAFKA/
>> > Kafka+Streams+Application+Reset+Tool
>> >       settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>> >
>> >       // need this for query service
>> >       settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
>> > s"${config.httpInterface}:${config.httpPort}")
>> >
>> >       // default is /tmp/kafka-streams
>> >       settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir)
>> >
>> >       // Set the commit interval to 500ms so that any changes are
>> flushed
>> > frequently and the summary
>> >       // data are updated with low latency.
>> >       settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
>> >
>> >       settings
>> >     }
>> >
>> > Please explain a bit ..
>> >
>> > regards.
>> >
>> >
>> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy <damian....@gmail.com>
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> Do you have the application.server property set appropriately for both
>> >> hosts?
>> >>
>> >> The second stack trace is this bug:
>> >> https://issues.apache.org/jira/browse/KAFKA-5556
>> >>
>> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <ghosh.debas...@gmail.com>
>> >> wrote:
>> >>
>> >> > Hi -
>> >> >
>> >> > In my Kafka Streams application, I have a state store resulting from
>> a
>> >> > stateful streaming topology. The environment is
>> >> >
>> >> >    - Kafka 0.10.2.1
>> >> >    - It runs on a DC/OS cluster
>> >> >    - I am running Confluent-Kafka 3.2.2 on the cluster
>> >> >    - Each topic that I have has 2 partitions with replication factor
>> = 2
>> >> >    - The application also has an associated http service that does
>> >> >    interactive queries on the state store
>> >> >
>> >> > The application runs fine when I invoke a single instance. I can use
>> the
>> >> > http endpoints to do queries and everything looks good. Problems
>> surface
>> >> > when I try to spawn another instance of the application. I use the
>> same
>> >> > APPLICATION_ID_CONFIG and the instance starts on a different node of
>> the
>> >> > cluster. The data consumption part works fine as the new instance
>> also
>> >> > starts consuming from the same topic as the first one. But when I try
>> >> the
>> >> > http query, the metadata fetch fails ..
>> >> >
>> >> > I have some code snippet like this as part of the query that tries to
>> >> fetch
>> >> > the metadata so that I can locate the host to query on ..
>> >> >
>> >> >     metadataService.streamsMetadataForStoreAndKey(store, hostKey,
>> >> > stringSerializer) match {
>> >> >       case Success(host) => {
>> >> >         // hostKey is on another instance. call the other instance to
>> >> fetch
>> >> > the data.
>> >> >         if (!thisHost(host)) {
>> >> >           logger.warn(s"Key $hostKey is on another instance not on
>> >> $host -
>> >> > requerying ..")
>> >> >           httpRequester.queryFromHost[Long](host, path)
>> >> >         } else {
>> >> >           // hostKey is on this instance
>> >> >           localStateStoreQuery.queryStateStore(streams, store,
>> hostKey)
>> >> >         }
>> >> >       }
>> >> >       case Failure(ex) => Future.failed(ex)
>> >> >     }
>> >> >
>> >> > and the metadataService.streamsMetadataForStoreAndKey has the
>> following
>> >> > call ..
>> >> >
>> >> >     streams.metadataForKey(store, key, serializer) match {
>> >> >       case null => throw new IllegalArgumentException(s"Metadata for
>> >> key
>> >> > $key not found in $store")
>> >> >       case metadata => new HostStoreInfo(metadata.host,
>> metadata.port,
>> >> > metadata.stateStoreNames.asScala.toSet)
>> >> >     }
>> >> >
>> >> > When I start the second instance, streams.metadataForKey returns null
>> >> for
>> >> > any key I pass .. Here's the relevant stack trace ..
>> >> >
>> >> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net
>> not
>> >> > found in access-count-per-host
>> >> >         at
>> >> >
>> >> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$
>> >> streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
>> >> >         at scala.util.Try$.apply(Try.scala:209)
>> >> >         at
>> >> >
>> >> > com.xx.fdp.sample.kstream.services.MetadataService.streamsMe
>> >> tadataForStoreAndKey(MetadataService.scala:46)
>> >> >         at
>> >> >
>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryI
>> >> nfo(KeyValueFetcher.scala:36)
>> >> >         at
>> >> >
>> >> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCo
>> >> untSummary(KeyValueFetcher.scala:29)
>> >> >         at
>> >> >
>> >> > com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun
>> >> $routes$10(WeblogDSLHttpService.scala:41)
>> >> >         at
>> >> >
>> >> > akka.http.scaladsl.server.directives.RouteDirectives.$anonfu
>> >> n$complete$1(RouteDirectives.scala:47)
>> >> >         at
>> >> >
>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
>> >> rdRoute.scala:19)
>> >> >         at
>> >> >
>> >> > akka.http.scaladsl.server.StandardRoute$$anon$1.apply(Standa
>> >> rdRoute.scala:19)
>> >> >         ...
>> >> >
>> >> > and following this exception I get another one which looks like an
>> >> internal
>> >> > exception that stops the application ..
>> >> >
>> >> > 09:57:33.731 TKD [StreamThread-1] ERROR
>> o.a.k.s.p.internals.StreamThread
>> >> -
>> >> > stream-thread [StreamThread-1] Streams application error during
>> >> processing:
>> >> > java.lang.IllegalStateException: Attempt to retrieve exception from
>> >> future
>> >> > which hasn't failed
>> >> >         at
>> >> >
>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>> >> exception(RequestFuture.java:99)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>> >> isRetriable(RequestFuture.java:89)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>> >> KafkaConsumer.java:1124)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamTask.
>> >> commitOffsets(StreamTask.java:296)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamTask$1.
>> >> run(StreamTask.java:79)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamTask.
>> >> commit(StreamTask.java:280)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> commitOne(StreamThread.java:807)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> commitAll(StreamThread.java:794)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> maybeCommit(StreamThread.java:769)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> runLoop(StreamThread.java:647)
>> >> >         at
>> >> >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> run(StreamThread.java:361)
>> >> >
>> >> > BTW the application runs fine when I have 2 instances running on the
>> >> same
>> >> > host (my laptop) or on 1 instance on the cluster. The problem
>> surfaces
>> >> only
>> >> > when I start the second instance.
>> >> >
>> >> > Any help / pointer / area to look into ?
>> >> >
>> >> > --
>> >> > Debasish Ghosh
>> >> > http://manning.com/ghosh2
>> >> > http://manning.com/ghosh
>> >> >
>> >> > Twttr: @debasishg
>> >> > Blog: http://debasishg.blogspot.com
>> >> > Code: http://github.com/debasishg
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > Debasish Ghosh
>> > http://manning.com/ghosh2
>> > http://manning.com/ghosh
>> >
>> > Twttr: @debasishg
>> > Blog: http://debasishg.blogspot.com
>> > Code: http://github.com/debasishg
>> >
>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>

Reply via email to