javad87 commented on PR #21677:
URL: https://github.com/apache/kafka/pull/21677#issuecomment-4022711745
@mjsax Thanks for addressing this regression! I have many services that use
MockProcessorContext for unit tests. It’s very useful because I only need to
test the processor logic itself, rather than setting up a full
TopologyTestDriver. However, due to this regression, many of my tests are now
failing.
I have two questions:
1. Do you have an approximate timeline for when a fix might be available?
2. If it takes longer to resolve, what would be the cleanest workaround that
avoids changing a large number of unit tests?
For example, I currently have the following setup for my tests:
```
@BeforeEach
void setUp() throws Exception {
var properties = new KafkaStreamsApplicationConfig();
properties.setSchemaRegistryUrl("mock://localhost:8081");
properties.setSchemaRegistryAutoRegisterSchemas(true);
properties.setServiceId("notification-Id");
this.ts = Instant.now().toEpochMilli();
this.eventContext = new KafkaEventContext(new MockProducerFactory());
this.eventContext.configure(properties);
this.eventContext.getMessageTable().registerLexicon(Messages.class);
this.channel = eventContext.getEventChannel();
var notificationTopology = new NotificationTopology(properties,
eventContext);
this.context = new MockProcessorContext<>(properties);
this.windicatorStore = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(WINDICATOR_STORE),
Serdes.String(), notificationTopology.getWindicatorSerde())
.withLoggingDisabled()
.build();
this.windowEventStore = Stores.windowStoreBuilder(
Stores.inMemoryWindowStore(WINDOW_EVENT_STORE, Duration.ofHours(30),
Duration.ofHours(30), false),
Serdes.String(),
notificationTopology.getClassifiedEventSerde()).withLoggingDisabled().build();
this.windowEventStore.init(this.context.getStateStoreContext(),
windowEventStore);
this.windicatorStore.init(this.context.getStateStoreContext(),
windicatorStore);
this.context.addStateStore(windicatorStore);
this.context.addStateStore(windowEventStore);
this.aggregator = new Aggregator(this.channel);
aggregator.init(this.context);
}
```
and using MockProcessorContext for scheduling Punctuator and capturing
forwarded record, e.g.:
```
final MockProcessorContext.CapturedPunctuator capturedPunctuator =
context.scheduledPunctuators().get(0);
final Punctuator punctuator = capturedPunctuator.getPunctuator();
punctuator.punctuate(ts + Duration.ofMinutes(3 * 60 + 1).toMillis());
// then
assertEquals(1, context.forwarded().size());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]