Hi, I am using kafka streams app connecting to confluent kafka cluster(10.2.1). Application is reading messages from a topic, performing a tranformation and pushing to output topic . There is no count or aggregation performed . Have following clarifications regarding state directory.
*1)* Will there be any data written in state directory? When i verified the state directory , it was showing 0 0 *2)* Application is running in kubernetes without any external volumes . Will state directory cause any processing issue during kubernetes pod restarts? *3)* Will the app creates a changelog topic since there is no in memory store used in the app? Code Snippet =========== Stream Config ============= properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, streamEnvironment.getKafkaBootstrapServers()); properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUMBER_OF_STREAM_THREADS); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY); properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, ItemDeserializationExceptionHandler.class); Stream builder ============= private void processStream(StreamsBuilder builder) { KStream<byte[], byte[]> input = builder.stream(inputTopic, Consumed.with(byteArraySerde, byteArraySerde)) .peek((key, value) -> metricsClient.writeMetric( CountMetric.generate(METRIC_OFFER_INPUT, 1))); KStream<byte[], DeserializationResultWrapper> deserializationResultStream = input .mapValues(this::deserializeOffer); quarantineNonDeSerializableOffers(deserializationResultStream); KStream<byte[], List<TransformerResult>> trans = transformOffers(deserializationResultStream); produceToQuarantineTopic(trans); produceToOutputTopic(trans); } private void produceToOutputTopic(KStream<byte[], List<TransformerResult>> trans) { trans.filter((key, value) -> value != null && !value.isEmpty()) .peek((key, value) -> metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT, 1))) .flatMapValues(transformerResults -> transformerResults.stream() .map(TransformerResult::getItem) .filter(Objects::nonNull) .collect(Collectors.toCollection(ArrayList::new))) .to(outputTopic, Produced.with(byteArraySerde, itemEnvelopeSerde)); } private void produceToQuarantineTopic(KStream<byte[], List<TransformerResult>> trans) { trans.filter((key, value) -> value == null || value.isEmpty() || value.stream().anyMatch(TransformerResult::hasErrors)) .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU)) .to(quarantineTopic, Produced.with(byteArraySerde, quarantineItemEnvelopeSerde)); } Thanks Pradeep