javad87 commented on PR #21677:
URL: https://github.com/apache/kafka/pull/21677#issuecomment-4022711745

   @mjsax  Thanks for addressing this regression! I have many services that use 
MockProcessorContext for unit tests. It’s very useful because I only need to 
test the processor logic itself, rather than setting up a full 
TopologyTestDriver. However, due to this regression, many of my tests are now 
failing.
   
   I have two questions:
   
   1. Do you have an approximate timeline for when a fix might be available?
   
   2. If it takes longer to resolve, what would be the cleanest workaround that 
avoids changing a large number of unit tests?
   
   For example, I currently have the following setup for my tests:
   ```    
   @BeforeEach
   void setUp() throws Exception {
       var properties = new KafkaStreamsApplicationConfig();
       properties.setSchemaRegistryUrl("mock://localhost:8081");
       properties.setSchemaRegistryAutoRegisterSchemas(true);
       properties.setServiceId("notification-Id");
   
       this.ts = Instant.now().toEpochMilli();
       this.eventContext = new KafkaEventContext(new MockProducerFactory());
       this.eventContext.configure(properties);
       this.eventContext.getMessageTable().registerLexicon(Messages.class);
       this.channel = eventContext.getEventChannel();
   
       var notificationTopology = new NotificationTopology(properties, 
eventContext);
   
       this.context = new MockProcessorContext<>(properties);
       this.windicatorStore = Stores.keyValueStoreBuilder(
               Stores.inMemoryKeyValueStore(WINDICATOR_STORE),
               Serdes.String(), notificationTopology.getWindicatorSerde())
           .withLoggingDisabled()
           .build();
   
       this.windowEventStore = Stores.windowStoreBuilder(
           Stores.inMemoryWindowStore(WINDOW_EVENT_STORE, Duration.ofHours(30), 
Duration.ofHours(30), false),
           Serdes.String(), 
notificationTopology.getClassifiedEventSerde()).withLoggingDisabled().build();
   
       this.windowEventStore.init(this.context.getStateStoreContext(), 
windowEventStore);
       this.windicatorStore.init(this.context.getStateStoreContext(), 
windicatorStore);
       this.context.addStateStore(windicatorStore);
       this.context.addStateStore(windowEventStore);
   
       this.aggregator = new Aggregator(this.channel);
       aggregator.init(this.context);
   }
   ```
   
   and using MockProcessorContext for scheduling Punctuator and capturing 
forwarded record, e.g.:
   
   ```
   final MockProcessorContext.CapturedPunctuator capturedPunctuator = 
context.scheduledPunctuators().get(0);
   final Punctuator punctuator = capturedPunctuator.getPunctuator();
   punctuator.punctuate(ts + Duration.ofMinutes(3 * 60 + 1).toMillis());
   // then
   assertEquals(1, context.forwarded().size());
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to