Hi Nan,

You'll need to set the "application.server" property for Kafka Streams.
Since you are using the "streams#allMetadata()" method I'm assuming you are
trying to set up interactive queries, so here are two links that should
help you get started:

https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#interactive-queries
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app

HTH,
Bill


On Wed, Feb 27, 2019 at 10:46 PM Nan Xu <[email protected]> wrote:

> hi,
>
>    trying the following program and want to see the metadata for
> test_store, and nothing get back, the  val metaIter =
> streams.allMetadata().iterator() size is 0. I can see data in the store
> though, but I need metadata so when I have multiple instance running. I can
> find the right store.
> is there any setting I missed?
>
> Thanks,
> Nan
>
>    val storeName = "test_store"
>   val streamProperties = new Properties()
>   streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "streams-store-test5")
>   streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092")
>
>   val streamBuilder = new StreamsBuilder()
>
>   val inputStream = streamBuilder.stream[String, String]("test1")
>
>   val kvStore = Stores.keyValueStoreBuilder(
>     Stores.persistentKeyValueStore(storeName),
>     serialization.Serdes.String(),
>     serialization.Serdes.String(),
>   ).withCachingDisabled()
>
>   streamBuilder.addStateStore(kvStore)
>
>   val transformer: TransformerSupplier[String, String, KeyValue[String,
> String]]
>   = () => new SimpleTransformer(storeName)
>
>   inputStream.transform(transformer, storeName)
>
>   val topology = streamBuilder.build()
>   val streams = new KafkaStreams(topology, streamProperties)
>   Runtime.getRuntime.addShutdownHook(new Thread { () => streams.close() })
>
>   import scala.concurrent.ExecutionContext.Implicits.global
>
>   streams.start()
>
>   Thread.sleep(5000)
>   Future {
>     val metaIter = streams.allMetadata().iterator()
>     while(metaIter.hasNext){
>       println("===meta: " + metaIter.next())
>     }
> }
>

Reply via email to