[kafka] branch trunk updated: KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8a237f5 KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645) 8a237f5 is described below commit 8a237f599afa539868a138b5a2534dbf884cb4ec Author: Matthias J. Sax AuthorDate: Mon May 13 00:31:44 2019 +0200 KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645) For session-windows, the result record should have the window-end timestamp as record timestamp. Rebased to resolve merge conflicts. Removed unused classes TupleForwarder and ForwardingCacheFlushListener (replace with TimestampedTupleForwarder, SessionTupleForwarder, TimestampedCacheFlushListerner, and SessionCacheFlushListener) Reviewers: John Roesler , Bruno Cadonna , Boyang Chen , Bill Bejeck , Guozhang Wang --- .../internals/KStreamSessionWindowAggregate.java | 10 +- ...istener.java => SessionCacheFlushListener.java} | 9 +- ...leForwarder.java => SessionTupleForwarder.java} | 21 +++-- .../processor/internals/ProcessorContextImpl.java | 11 ++- .../internals/ProcessorRecordContext.java | 6 +- .../state/internals/AbstractStoreBuilder.java | 4 +- .../state/internals/CachingSessionStore.java | 1 - .../state/internals/SessionStoreBuilder.java | 19 ++-- .../KStreamAggregationIntegrationTest.java | 101 ++--- .../kstream/internals/KGroupedStreamImplTest.java | 95 --- ...KStreamSessionWindowAggregateProcessorTest.java | 88 ++ .../internals/SessionCacheFlushListenerTest.java | 52 +++ ...derTest.java => SessionTupleForwarderTest.java} | 34 --- .../internals/SessionWindowedKStreamImplTest.java | 93 ++- .../kstream/internals/SuppressScenarioTest.java| 37 +--- .../state/internals/CachingSessionStoreTest.java | 91 ++- .../state/internals/SessionStoreBuilderTest.java | 50 ++ .../java/org/apache/kafka/test/MockProcessor.java | 9 ++ .../kafka/streams/scala/kstream/KTableTest.scala | 20 +++- 19 files changed, 531 insertions(+), 220 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 3168393..68dc4a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -78,7 +78,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor { private SessionStore store; -private TupleForwarder, Agg> tupleForwarder; +private SessionTupleForwarder tupleForwarder; private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; @@ -93,7 +93,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); store = (SessionStore) context.getStateStore(storeName); -tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues); +tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues); } @Override @@ -109,10 +109,10 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce return; } -observedStreamTime = Math.max(observedStreamTime, context().timestamp()); +final long timestamp = context().timestamp(); +observedStreamTime = Math.max(observedStreamTime, timestamp); final long closeTime = observedStreamTime - windows.gracePeriodMs(); -final long timestamp = context().timestamp(); final List, Agg>> merged = new ArrayList<>(); final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); SessionWindow mergedWindow = newSessionWindow; @@ -148,7 +148,7 @@ public class KStreamSessionWindowAggregate implements KStreamAggProce context().topic(), context().partition(), context().offset(), -context().timestamp(), +timestamp, mergedWindow.start(), mergedWindow.end(), closeTime, diff --git
[kafka] branch trunk updated: KAFKA-6521: Use timestamped stores for KTables (#6667)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8649717 KAFKA-6521: Use timestamped stores for KTables (#6667) 8649717 is described below commit 8649717d6dde081c75fc441a56f63ee1556dc758 Author: Matthias J. Sax AuthorDate: Sun May 12 11:50:55 2019 +0200 KAFKA-6521: Use timestamped stores for KTables (#6667) Reviewers: John Roesler , Boyang Chen , Bill Bejeck , Guozhang Wang --- .../kstream/internals/KGroupedStreamImpl.java | 2 +- .../kstream/internals/KGroupedTableImpl.java | 2 +- .../kstream/internals/KStreamAggregate.java| 38 +++-- .../streams/kstream/internals/KStreamReduce.java | 37 ++-- .../kstream/internals/KStreamWindowAggregate.java | 40 ++--- .../streams/kstream/internals/KTableAggregate.java | 22 ++- .../streams/kstream/internals/KTableFilter.java| 22 ++- .../streams/kstream/internals/KTableImpl.java | 17 +- .../kstream/internals/KTableKTableJoinMerger.java | 17 +- .../streams/kstream/internals/KTableMapValues.java | 22 ++- .../KTableMaterializedValueGetterSupplier.java | 14 +- .../streams/kstream/internals/KTableReduce.java| 22 ++- .../streams/kstream/internals/KTableSource.java| 22 ++- .../internals/KTableSourceValueGetterSupplier.java | 16 +- .../kstream/internals/KTableTransformValues.java | 29 ++-- .../internals/SessionWindowedKStreamImpl.java | 3 +- .../kstream/internals/TimeWindowedKStreamImpl.java | 16 +- .../internals/TimestampedCacheFlushListener.java | 53 ++ ...a => TimestampedKeyValueStoreMaterializer.java} | 11 +- ...rwarder.java => TimestampedTupleForwarder.java} | 25 +-- .../streams/kstream/internals/TupleForwarder.java | 15 +- .../internals/graph/KTableKTableJoinNode.java | 25 +-- .../internals/graph/TableProcessorNode.java| 8 +- .../kstream/internals/graph/TableSourceNode.java | 9 +- .../org/apache/kafka/streams/processor/To.java | 23 +++ .../internals/GlobalProcessorContextImpl.java | 10 +- .../processor/internals/ProcessorContextImpl.java | 6 +- .../kafka/streams/state/ValueAndTimestamp.java | 14 +- .../state/internals/CachingKeyValueStore.java | 6 +- .../internals/MeteredTimestampedKeyValueStore.java | 2 +- .../internals/TimestampedKeyValueStoreBuilder.java | 100 ++- .../internals/TimestampedWindowStoreBuilder.java | 107 +++- .../kstream/internals/KTableReduceTest.java| 14 +- .../internals/KTableTransformValuesTest.java | 6 +- .../TimestampedCacheFlushListenerTest.java | 75 ...est.java => TimestampedTupleForwarderTest.java} | 31 ++-- .../kstream/internals/TupleForwarderTest.java | 19 ++- .../internals/GlobalStreamThreadTest.java | 4 +- .../processor/internals/StandbyTaskTest.java | 30 ++-- ... TimestampedKeyValueStoreMaterializerTest.java} | 50 +++--- .../org/apache/kafka/streams/state/StoresTest.java | 32 .../TimestampedKeyValueStoreBuilderTest.java | 8 +- .../TimestampedWindowStoreBuilderTest.java | 7 +- .../GenericInMemoryTimestampedKeyValueStore.java | 190 + 44 files changed, 946 insertions(+), 275 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index eab1e8f..6f6521c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -185,7 +185,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS final MaterializedInternal> materializedInternal) { return aggregateBuilder.build( functionName, -new KeyValueStoreMaterializer<>(materializedInternal).materialize(), +new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(), aggregateSupplier, materializedInternal.queryableStoreName(), materializedInternal.keySerde(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 56be0f6..75c998a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -88,7 +88,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(