Hi,
I am using kafka streams app connecting to confluent kafka cluster(10.2.1).
Application is reading messages from a topic, performing a tranformation
and pushing to output topic . There is no count or aggregation performed .
Have following clarifications regarding state directory.

*1)* Will there be any data written in state directory?
When i verified the state directory , it was showing
0
0

*2)* Application is running in kubernetes without any external volumes .
Will state directory cause any processing issue during kubernetes pod
restarts?

*3)* Will the app creates a changelog topic since there is no in memory
store used in the app?


Code Snippet
===========

Stream Config
=============

properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
streamEnvironment.getKafkaBootstrapServers());
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
NUMBER_OF_STREAM_THREADS);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);


properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY);
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
        ItemDeserializationExceptionHandler.class);



Stream builder
=============

private void processStream(StreamsBuilder builder) {
    KStream<byte[], byte[]> input = builder.stream(inputTopic,
Consumed.with(byteArraySerde, byteArraySerde))
                                           .peek((key, value) ->
metricsClient.writeMetric(

CountMetric.generate(METRIC_OFFER_INPUT, 1)));

    KStream<byte[], DeserializationResultWrapper>
deserializationResultStream = input
            .mapValues(this::deserializeOffer);

    quarantineNonDeSerializableOffers(deserializationResultStream);

    KStream<byte[], List<TransformerResult>> trans =
transformOffers(deserializationResultStream);

    produceToQuarantineTopic(trans);

    produceToOutputTopic(trans);

}

private void produceToOutputTopic(KStream<byte[],
List<TransformerResult>> trans) {
    trans.filter((key, value) -> value != null
                                 && !value.isEmpty())
         .peek((key, value) ->
metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT,
1)))
         .flatMapValues(transformerResults -> transformerResults.stream()

.map(TransformerResult::getItem)

.filter(Objects::nonNull)

.collect(Collectors.toCollection(ArrayList::new)))
         .to(outputTopic, Produced.with(byteArraySerde, itemEnvelopeSerde));
}

private void produceToQuarantineTopic(KStream<byte[],
List<TransformerResult>> trans) {
    trans.filter((key, value) -> value == null || value.isEmpty()
                                 ||
value.stream().anyMatch(TransformerResult::hasErrors))
         .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU))
         .to(quarantineTopic, Produced.with(byteArraySerde,
quarantineItemEnvelopeSerde));
}

Thanks
Pradeep

Reply via email to