[
https://issues.apache.org/jira/browse/KAFKA-12542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308053#comment-17308053
]
Guozhang Wang commented on KAFKA-12542:
---------------------------------------
Hi [~sinhash]
I cannot tell if `dlqTopic:outputTopic` is indeed named as `OUTPUT_TOPIC`, I
suspect there may be some mis-naming?
BTW I'd suggest you to send such questions to the mailing list, where you can
get more visibility and helpers; the JIRA system is usually for reporting
confirmed bugs.
> Unknown Output topic when Filter Processor is returning false
> -------------------------------------------------------------
>
> Key: KAFKA-12542
> URL: https://issues.apache.org/jira/browse/KAFKA-12542
> Project: Kafka
> Issue Type: Bug
> Components: streams-test-utils
> Affects Versions: 2.6.0
> Reporter: SHWETA SINHA
> Priority: Major
>
> I am using Kafka Streams DSL to create topology.
> StreamsBuilder streamsBuilder=new StreamsBuilder();
> valueSerde.configure(getConfigForSpecificAvro(appId),false);
> KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic,
> Consumed.with(new Serdes.StringSerde(), valueSerde));
> KStream<String, AvroDTO> filtered = stream .filter((key, value) ->
> ServiceConsumer.filter(key,value));
> filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String,
> SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value))
> .to((k,v,recordContext) -> v instanceof AvroDTO?
> dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde));
> Topology topology=streamsBuilder.build();
> KafkaStreams kafkaStreams=new
> KafkaStreams(topology,getKafkaStreamsConfig(appId));
>
> To Test the Topology, I am using TopologyTestDriver.
> when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig);
> when(ServiceConsumer.filter(any(),any())).thenReturn(false);
> // when(ServiceConsumer.process(any(),any())).thenReturn(new
> KeyValue<>(statusDto.getTaskId().toString(),statusDto));
> topologyTestDriver=new
> TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig);
> StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new
> StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient));
> UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new
> StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config));
> StreamInput.pipeInput("Hi);
> assertThat(UpdateOutput .isEmpty()).isTrue();
>
> I am checking if there are no filtered messages then my output topic is empty.
> Getting Error while Unit Testing
> java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC
>
> Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true
> doesnt gives any error.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)