ableegoldman commented on code in PR #18155:
URL: https://github.com/apache/kafka/pull/18155#discussion_r1883007521
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##########
@@ -33,20 +38,31 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends VOut>
mapper;
private final String queryableName;
private boolean sendOldValues = false;
+ private final StoreFactory storeFactory;
KTableMapValues(final KTableImpl<KIn, ?, VIn> parent,
final ValueMapperWithKey<? super KIn, ? super VIn, ?
extends VOut> mapper,
- final String queryableName) {
+ final String queryableName,
+ final StoreFactory storeFactory) {
this.parent = parent;
this.mapper = mapper;
this.queryableName = queryableName;
+ this.storeFactory = storeFactory;
}
@Override
public Processor<KIn, Change<VIn>, KIn, Change<VOut>> get() {
return new KTableMapValuesProcessor();
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ if (storeFactory == null) {
+ return Collections.emptySet();
Review Comment:
doesn't really matter but FYI, we can just return null
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java:
##########
@@ -62,21 +62,25 @@ public String toString() {
@SuppressWarnings("unchecked")
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
+
final String processorName = processorParameters.processorName();
- topologyBuilder.addProcessor(processorName,
processorParameters.processorSupplier(), parentNodeNames());
if (storeNames.length > 0) {
+ // todo(rodesai): remove me once all operators have been moved to
ProcessorSupplier
topologyBuilder.connectProcessorAndStateStores(processorName,
storeNames);
}
final KTableSource<K, V> tableSource =
processorParameters.processorSupplier() instanceof KTableSource ?
(KTableSource<K, V>) processorParameters.processorSupplier() :
null;
if (tableSource != null) {
+ // todo(rodesai): remove once KTableImpl#doJoinOnForeignKey moved
to ProcessorSupplier
if (tableSource.materialized()) {
topologyBuilder.addStateStore(Objects.requireNonNull(storeFactory,
"storeFactory was null"),
processorName);
}
Review Comment:
this is interesting -- I'm not 100% sure but I think we can actually remove
this part right now, because Almog already did a PR to convert the KTableSource
to implementing #stores
I'm pretty sure I remember doing exactly this (removing this code) in my
original rough draft POC, so I'd be interested if we can get rid of this right
now. I guess try and see if tests fail?
##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -1687,15 +1707,15 @@ public void shouldWrapProcessorsForStatelessOperators()
{
.peek((k, v) -> { }, Named.as("peek")) // wrapped 4
.flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) //
wrapped 5
.toTable(Named.as("toTable")) // should be wrapped when we do
StreamToTableNode
- .filter((k, v) -> true, Named.as("filter-table")) // should be
wrapped once we do TableProcessorNode
Review Comment:
to be honest I had no idea this was a stateful operation when I wrote this
test, as you can tell from the name being
`shouldWrapProcessorsForStatelessOperators` -- we should either change the name
or cut out the `.toTable().filter().toStream()` part
Honestly either is fine with me, since we have a separate test for the
KTable filter now. Though I kind of like the idea of having one test with
multiple operators for the KTable stuff, since you never know how things might
interact in the DSL. Maybe just move this part to a separate test and we can
have one test that covers all the stateful KTable operators you're doing here
(eg mapValues, filter, etc)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]