hi,
trying the following program and want to see the metadata for
test_store, and nothing get back, the val metaIter =
streams.allMetadata().iterator() size is 0. I can see data in the store
though, but I need metadata so when I have multiple instance running. I can
find the right store.
is there any setting I missed?
Thanks,
Nan
val storeName = "test_store"
val streamProperties = new Properties()
streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streams-store-test5")
streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
val streamBuilder = new StreamsBuilder()
val inputStream = streamBuilder.stream[String, String]("test1")
val kvStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
serialization.Serdes.String(),
serialization.Serdes.String(),
).withCachingDisabled()
streamBuilder.addStateStore(kvStore)
val transformer: TransformerSupplier[String, String, KeyValue[String,
String]]
= () => new SimpleTransformer(storeName)
inputStream.transform(transformer, storeName)
val topology = streamBuilder.build()
val streams = new KafkaStreams(topology, streamProperties)
Runtime.getRuntime.addShutdownHook(new Thread { () => streams.close() })
import scala.concurrent.ExecutionContext.Implicits.global
streams.start()
Thread.sleep(5000)
Future {
val metaIter = streams.allMetadata().iterator()
while(metaIter.hasNext){
println("===meta: " + metaIter.next())
}
}