[kafka] branch trunk updated: KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645)

2019-05-12 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 8a237f5  KAFKA-6455: Session Aggregation should use window-end-time as 
record timestamp (#6645)
8a237f5 is described below

commit 8a237f599afa539868a138b5a2534dbf884cb4ec
Author: Matthias J. Sax 
AuthorDate: Mon May 13 00:31:44 2019 +0200

KAFKA-6455: Session Aggregation should use window-end-time as record 
timestamp (#6645)

For session-windows, the result record should have the window-end timestamp 
as record timestamp.

Rebased to resolve merge conflicts. Removed unused classes TupleForwarder 
and ForwardingCacheFlushListener (replace with TimestampedTupleForwarder, 
SessionTupleForwarder, TimestampedCacheFlushListerner, and 
SessionCacheFlushListener)

Reviewers: John Roesler , Bruno Cadonna 
, Boyang Chen , Bill Bejeck 
, Guozhang Wang 
---
 .../internals/KStreamSessionWindowAggregate.java   |  10 +-
 ...istener.java => SessionCacheFlushListener.java} |   9 +-
 ...leForwarder.java => SessionTupleForwarder.java} |  21 +++--
 .../processor/internals/ProcessorContextImpl.java  |  11 ++-
 .../internals/ProcessorRecordContext.java  |   6 +-
 .../state/internals/AbstractStoreBuilder.java  |   4 +-
 .../state/internals/CachingSessionStore.java   |   1 -
 .../state/internals/SessionStoreBuilder.java   |  19 ++--
 .../KStreamAggregationIntegrationTest.java | 101 ++---
 .../kstream/internals/KGroupedStreamImplTest.java  |  95 ---
 ...KStreamSessionWindowAggregateProcessorTest.java |  88 ++
 .../internals/SessionCacheFlushListenerTest.java   |  52 +++
 ...derTest.java => SessionTupleForwarderTest.java} |  34 ---
 .../internals/SessionWindowedKStreamImplTest.java  |  93 ++-
 .../kstream/internals/SuppressScenarioTest.java|  37 +---
 .../state/internals/CachingSessionStoreTest.java   |  91 ++-
 .../state/internals/SessionStoreBuilderTest.java   |  50 ++
 .../java/org/apache/kafka/test/MockProcessor.java  |   9 ++
 .../kafka/streams/scala/kstream/KTableTest.scala   |  20 +++-
 19 files changed, 531 insertions(+), 220 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 3168393..68dc4a7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -78,7 +78,7 @@ public class KStreamSessionWindowAggregate 
implements KStreamAggProce
 private class KStreamSessionWindowAggregateProcessor extends 
AbstractProcessor {
 
 private SessionStore store;
-private TupleForwarder, Agg> tupleForwarder;
+private SessionTupleForwarder tupleForwarder;
 private StreamsMetricsImpl metrics;
 private InternalProcessorContext internalProcessorContext;
 private Sensor lateRecordDropSensor;
@@ -93,7 +93,7 @@ public class KStreamSessionWindowAggregate 
implements KStreamAggProce
 lateRecordDropSensor = 
Sensors.lateRecordDropSensor(internalProcessorContext);
 
 store = (SessionStore) context.getStateStore(storeName);
-tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<>(context), sendOldValues);
+tupleForwarder = new SessionTupleForwarder<>(store, context, new 
SessionCacheFlushListener<>(context), sendOldValues);
 }
 
 @Override
@@ -109,10 +109,10 @@ public class KStreamSessionWindowAggregate 
implements KStreamAggProce
 return;
 }
 
-observedStreamTime = Math.max(observedStreamTime, 
context().timestamp());
+final long timestamp = context().timestamp();
+observedStreamTime = Math.max(observedStreamTime, timestamp);
 final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
 
-final long timestamp = context().timestamp();
 final List, Agg>> merged = new ArrayList<>();
 final SessionWindow newSessionWindow = new 
SessionWindow(timestamp, timestamp);
 SessionWindow mergedWindow = newSessionWindow;
@@ -148,7 +148,7 @@ public class KStreamSessionWindowAggregate 
implements KStreamAggProce
 context().topic(),
 context().partition(),
 context().offset(),
-context().timestamp(),
+timestamp,
 mergedWindow.start(),
 mergedWindow.end(),
 closeTime,
diff --git 

[kafka] branch trunk updated: KAFKA-6521: Use timestamped stores for KTables (#6667)

2019-05-12 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 8649717  KAFKA-6521: Use timestamped stores for KTables (#6667)
8649717 is described below

commit 8649717d6dde081c75fc441a56f63ee1556dc758
Author: Matthias J. Sax 
AuthorDate: Sun May 12 11:50:55 2019 +0200

KAFKA-6521: Use timestamped stores for KTables (#6667)

Reviewers: John Roesler , Boyang Chen 
, Bill Bejeck , Guozhang Wang 

---
 .../kstream/internals/KGroupedStreamImpl.java  |   2 +-
 .../kstream/internals/KGroupedTableImpl.java   |   2 +-
 .../kstream/internals/KStreamAggregate.java|  38 +++--
 .../streams/kstream/internals/KStreamReduce.java   |  37 ++--
 .../kstream/internals/KStreamWindowAggregate.java  |  40 ++---
 .../streams/kstream/internals/KTableAggregate.java |  22 ++-
 .../streams/kstream/internals/KTableFilter.java|  22 ++-
 .../streams/kstream/internals/KTableImpl.java  |  17 +-
 .../kstream/internals/KTableKTableJoinMerger.java  |  17 +-
 .../streams/kstream/internals/KTableMapValues.java |  22 ++-
 .../KTableMaterializedValueGetterSupplier.java |  14 +-
 .../streams/kstream/internals/KTableReduce.java|  22 ++-
 .../streams/kstream/internals/KTableSource.java|  22 ++-
 .../internals/KTableSourceValueGetterSupplier.java |  16 +-
 .../kstream/internals/KTableTransformValues.java   |  29 ++--
 .../internals/SessionWindowedKStreamImpl.java  |   3 +-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  16 +-
 .../internals/TimestampedCacheFlushListener.java   |  53 ++
 ...a => TimestampedKeyValueStoreMaterializer.java} |  11 +-
 ...rwarder.java => TimestampedTupleForwarder.java} |  25 +--
 .../streams/kstream/internals/TupleForwarder.java  |  15 +-
 .../internals/graph/KTableKTableJoinNode.java  |  25 +--
 .../internals/graph/TableProcessorNode.java|   8 +-
 .../kstream/internals/graph/TableSourceNode.java   |   9 +-
 .../org/apache/kafka/streams/processor/To.java |  23 +++
 .../internals/GlobalProcessorContextImpl.java  |  10 +-
 .../processor/internals/ProcessorContextImpl.java  |   6 +-
 .../kafka/streams/state/ValueAndTimestamp.java |  14 +-
 .../state/internals/CachingKeyValueStore.java  |   6 +-
 .../internals/MeteredTimestampedKeyValueStore.java |   2 +-
 .../internals/TimestampedKeyValueStoreBuilder.java | 100 ++-
 .../internals/TimestampedWindowStoreBuilder.java   | 107 +++-
 .../kstream/internals/KTableReduceTest.java|  14 +-
 .../internals/KTableTransformValuesTest.java   |   6 +-
 .../TimestampedCacheFlushListenerTest.java |  75 
 ...est.java => TimestampedTupleForwarderTest.java} |  31 ++--
 .../kstream/internals/TupleForwarderTest.java  |  19 ++-
 .../internals/GlobalStreamThreadTest.java  |   4 +-
 .../processor/internals/StandbyTaskTest.java   |  30 ++--
 ... TimestampedKeyValueStoreMaterializerTest.java} |  50 +++---
 .../org/apache/kafka/streams/state/StoresTest.java |  32 
 .../TimestampedKeyValueStoreBuilderTest.java   |   8 +-
 .../TimestampedWindowStoreBuilderTest.java |   7 +-
 .../GenericInMemoryTimestampedKeyValueStore.java   | 190 +
 44 files changed, 946 insertions(+), 275 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index eab1e8f..6f6521c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -185,7 +185,7 @@ class KGroupedStreamImpl extends AbstractStream 
implements KGroupedS
  final MaterializedInternal> materializedInternal) {
 return aggregateBuilder.build(
 functionName,
-new 
KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+new 
TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(),
 aggregateSupplier,
 materializedInternal.queryableStoreName(),
 materializedInternal.keySerde(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 56be0f6..75c998a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -88,7 +88,7 @@ public class KGroupedTableImpl extends 
AbstractStream implements KGr
 final StatefulProcessorNode statefulProcessorNode = new 
StatefulProcessorNode<>(