[ https://issues.apache.org/jira/browse/KAFKA-12542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
SHWETA SINHA updated KAFKA-12542: --------------------------------- Summary: Unknown Output topic when Filter Processor is returning false (was: TopologyTestDriver) > Unknown Output topic when Filter Processor is returning false > ------------------------------------------------------------- > > Key: KAFKA-12542 > URL: https://issues.apache.org/jira/browse/KAFKA-12542 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils > Affects Versions: 2.6.0 > Reporter: SHWETA SINHA > Priority: Major > > I am using Kafka Streams DSL to create topology. > StreamsBuilder streamsBuilder=new StreamsBuilder(); > valueSerde.configure(getConfigForSpecificAvro(appId),false); > KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic, > Consumed.with(new Serdes.StringSerde(), valueSerde)); > KStream<String, AvroDTO> filtered = stream .filter((key, value) -> > ServiceConsumer.filter(key,value)); > filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String, > SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value)) > .to((k,v,recordContext) -> v instanceof AvroDTO? > dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde)); > Topology topology=streamsBuilder.build(); > KafkaStreams kafkaStreams=new > KafkaStreams(topology,getKafkaStreamsConfig(appId)); > > To Test the Topology, I am using TopologyTestDriver. > when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig); > when(ServiceConsumer.filter(any(),any())).thenReturn(false); > // when(ServiceConsumer.process(any(),any())).thenReturn(new > KeyValue<>(statusDto.getTaskId().toString(),statusDto)); > topologyTestDriver=new > TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig); > StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new > StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient)); > UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new > StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config)); > StreamInput.pipeInput("Hi); > assertThat(UpdateOutput .isEmpty()).isTrue(); > > I am checking if there are no filtered messages then my output topic is empty. > Getting Error while Unit Testing > java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC > > Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true > doesnt gives any error. > -- This message was sent by Atlassian Jira (v8.3.4#803005)